You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/02/24 00:11:25 UTC

[1/3] mesos git commit: Refactored and simplified the docker puller interfaces.

Repository: mesos
Updated Branches:
  refs/heads/master 60f211afe -> 14f070fda


Refactored and simplified the docker puller interfaces.

Review: https://reviews.apache.org/r/43801


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/caf6c029
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/caf6c029
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/caf6c029

Branch: refs/heads/master
Commit: caf6c02989cfc9236d0ae03eaad4844d6763d5c0
Parents: 60f211a
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Feb 19 21:41:01 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 23 15:08:07 2016 -0800

----------------------------------------------------------------------
 .../mesos/provisioner/docker/local_puller.cpp   | 220 ++++++++++++-------
 .../mesos/provisioner/docker/local_puller.hpp   |  11 +-
 .../mesos/provisioner/docker/paths.cpp          |  55 ++---
 .../mesos/provisioner/docker/paths.hpp          |  48 ++--
 .../mesos/provisioner/docker/puller.cpp         | 133 +----------
 .../mesos/provisioner/docker/puller.hpp         |  47 +---
 .../provisioner/docker/registry_puller.cpp      | 213 ++++++++++--------
 .../provisioner/docker/registry_puller.hpp      |  36 ++-
 .../mesos/provisioner/docker/store.cpp          | 139 ++++++------
 .../mesos/provisioner/docker/store.hpp          |   2 +-
 .../containerizer/provisioner_docker_tests.cpp  |  37 ++--
 11 files changed, 404 insertions(+), 537 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
index f3e7c04..dfd32dc 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
@@ -14,8 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <list>
-#include <map>
+#include <string>
 #include <vector>
 
 #include <glog/logging.h>
@@ -28,22 +27,18 @@
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
-#include <process/subprocess.hpp>
+#include <process/process.hpp>
 
-#include "common/status_utils.hpp"
 #include "common/command_utils.hpp"
 
 #include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
 
 using namespace process;
 
 namespace spec = docker::spec;
 
 using std::list;
-using std::map;
-using std::pair;
 using std::string;
 using std::vector;
 
@@ -55,23 +50,32 @@ namespace docker {
 class LocalPullerProcess : public Process<LocalPullerProcess>
 {
 public:
-  LocalPullerProcess(const string& _archivesDir) : archivesDir(_archivesDir) {}
+  LocalPullerProcess(const string& _archivesDir)
+    : archivesDir(_archivesDir) {}
 
   ~LocalPullerProcess() {}
 
-  Future<list<pair<string, string>>> pull(
+  Future<vector<string>> pull(
       const spec::ImageReference& reference,
       const string& directory);
 
 private:
-  Future<list<pair<string, string>>> putImage(
+  Future<vector<string>> _pull(
       const spec::ImageReference& reference,
       const string& directory);
 
-  Future<list<pair<string, string>>> putLayers(
+  Result<string> getParentLayerId(
+      const string& directory,
+      const string& layerId);
+
+  Future<Nothing> extractLayers(
       const string& directory,
       const vector<string>& layerIds);
 
+  Future<Nothing> extractLayer(
+      const string& directory,
+      const string& layerId);
+
   const string archivesDir;
 };
 
@@ -90,7 +94,7 @@ Try<Owned<Puller>> LocalPuller::create(const Flags& flags)
 }
 
 
-LocalPuller::LocalPuller(Owned<LocalPullerProcess>& _process)
+LocalPuller::LocalPuller(Owned<LocalPullerProcess> _process)
   : process(_process)
 {
   spawn(process.get());
@@ -104,16 +108,19 @@ LocalPuller::~LocalPuller()
 }
 
 
-Future<list<pair<string, string>>> LocalPuller::pull(
+Future<vector<string>> LocalPuller::pull(
     const spec::ImageReference& reference,
-    const Path& directory)
+    const string& directory)
 {
   return dispatch(
-      process.get(), &LocalPullerProcess::pull, reference, directory);
+      process.get(),
+      &LocalPullerProcess::pull,
+      reference,
+      directory);
 }
 
 
-Future<list<pair<string, string>>> LocalPullerProcess::pull(
+Future<vector<string>> LocalPullerProcess::pull(
     const spec::ImageReference& reference,
     const string& directory)
 {
@@ -133,110 +140,157 @@ Future<list<pair<string, string>>> LocalPullerProcess::pull(
           << "' to '" << directory << "'";
 
   return command::untar(Path(tarPath), Path(directory))
-    .then(defer(self(), &Self::putImage, reference, directory));
+    .then(defer(self(), &Self::_pull, reference, directory));
 }
 
 
-static Result<string> getParentId(
-    const string& directory,
-    const string& layerId)
+Future<vector<string>> LocalPullerProcess::_pull(
+    const spec::ImageReference& reference,
+    const string& directory)
 {
-  Try<string> manifest =
-    os::read(paths::getImageArchiveLayerManifestPath(directory, layerId));
-
-  if (manifest.isError()) {
-    return Error("Failed to read manifest: " + manifest.error());
+  // We first parse the 'repositories' JSON file to get the top most
+  // layer id for the image.
+  Try<string> _repositories = os::read(path::join(directory, "repositories"));
+  if (_repositories.isError()) {
+    return Failure("Failed to read 'repositories': " + _repositories.error());
   }
 
-  Try<JSON::Object> json = JSON::parse<JSON::Object>(manifest.get());
-  if (json.isError()) {
-    return Error("Failed to parse manifest: " + json.error());
-  }
+  Try<JSON::Object> repositories =
+    JSON::parse<JSON::Object>(_repositories.get());
 
-  Result<JSON::String> parentId = json.get().find<JSON::String>("parent");
-  if (parentId.isNone() || (parentId.isSome() && parentId.get() == "")) {
-    return None();
-  } else if (parentId.isError()) {
-    return Error("Failed to read parent of layer: " + parentId.error());
+  if (repositories.isError()) {
+    return Failure("Failed to parse 'repositories': " + repositories.error());
   }
 
-  return parentId.get().value;
-}
+  Result<JSON::Object> repository =
+    repositories->find<JSON::Object>(reference.repository());
 
+  if (repository.isError()) {
+    return Failure(
+        "Failed to find repository '" + reference.repository() +
+        "' in 'repositories': " + repository.error());
+  } else if (repository.isNone()) {
+    return Failure(
+        "Repository '" + reference.repository() + "' does not "
+        "exist in 'repositories'");
+  }
 
-Future<list<pair<string, string>>> LocalPullerProcess::putImage(
-    const spec::ImageReference& reference,
-    const string& directory)
-{
-  Try<string> value =
-    os::read(paths::getImageArchiveRepositoriesPath(directory));
+  const string tag = reference.has_tag()
+    ? reference.tag()
+    : "latest";
 
-  if (value.isError()) {
-    return Failure("Failed to read repository JSON: " + value.error());
+  // NOTE: We don't use JSON find here since a tag might contain '.'.
+  if (repository->values.count(tag) == 0) {
+    return Failure("Tag '" + tag + "' is not found");
   }
 
-  Try<JSON::Object> json = JSON::parse<JSON::Object>(value.get());
-  if (json.isError()) {
-    return Failure("Failed to parse JSON: " + json.error());
+  JSON::Value _layerId = repository->values.at(tag);
+  if (!_layerId.is<JSON::String>()) {
+    return Failure("Layer id is not a string");
   }
 
-  Result<JSON::Object> repositoryValue =
-    json.get().find<JSON::Object>(reference.repository());
+  string layerId = _layerId.as<JSON::String>().value;
+
+  // Do a traverse to find all parent image layer ids. Here, we assume
+  // that all the parent layers are part of the archive tar, thus are
+  // already extracted under 'directory'.
+  vector<string> layerIds = { layerId };
+  Result<string> parentLayerId = getParentLayerId(directory, layerId);
+  while (parentLayerId.isSome()) {
+    // NOTE: We put parent layer ids in front because that's what the
+    // provisioner backends assume.
+    layerIds.insert(layerIds.begin(), parentLayerId.get());
+    parentLayerId = getParentLayerId(directory, parentLayerId.get());
+  }
 
-  if (repositoryValue.isError()) {
-    return Failure("Failed to find repository: " + repositoryValue.error());
-  } else if (repositoryValue.isNone()) {
-    return Failure("Repository '" + reference.repository() + "' is not found");
+  if (parentLayerId.isError()) {
+    return Failure(
+        "Failed to find parent layer id for layer '" + layerId +
+        "': " + parentLayerId.error());
   }
 
-  const JSON::Object repositoryJson = repositoryValue.get();
+  return extractLayers(directory, layerIds)
+    .then([layerIds]() { return layerIds; });
+}
 
-  const string tag = reference.has_tag()
-    ? reference.tag()
-    : "latest";
 
-  // We don't use JSON find here because a tag might contain a '.'.
-  map<string, JSON::Value>::const_iterator entry =
-    repositoryJson.values.find(tag);
+Result<string> LocalPullerProcess::getParentLayerId(
+    const string& directory,
+    const string& layerId)
+{
+  const string layerPath = path::join(directory, layerId);
 
-  if (entry == repositoryJson.values.end()) {
-    return Failure("Tag '" + tag + "' is not found");
-  } else if (!entry->second.is<JSON::String>()) {
-    return Failure("Tag JSON value expected to be JSON::String");
+  Try<string> _manifest = os::read(paths::getImageLayerManifestPath(layerPath));
+  if (_manifest.isError()) {
+    return Error("Failed to read manifest: " + _manifest.error());
   }
 
-  const string layerId = entry->second.as<JSON::String>().value;
-
-  vector<string> layerIds;
-  layerIds.push_back(layerId);
-  Result<string> parentId = getParentId(directory, layerId);
-
-  while (parentId.isSome()) {
-    layerIds.insert(layerIds.begin(), parentId.get());
-    parentId = getParentId(directory, parentId.get());
+  Try<JSON::Object> manifest = JSON::parse<JSON::Object>(_manifest.get());
+  if (manifest.isError()) {
+    return Error("Failed to parse manifest: " + manifest.error());
   }
 
-  if (parentId.isError()) {
-    return Failure("Failed to find parent layer id of layer '" + layerId +
-                   "': " + parentId.error());
+  Result<JSON::Value> parentLayerId = manifest->find<JSON::Value>("parent");
+  if (parentLayerId.isError()) {
+    return Error("Failed to parse 'parent': " + parentLayerId.error());
+  } else if (parentLayerId.isNone()) {
+    return None();
+  } else if (parentLayerId->is<JSON::Null>()) {
+    return None();
+  } else if (!parentLayerId->is<JSON::String>()) {
+    return Error("Unexpected 'parent' type");
   }
 
-  return putLayers(directory, layerIds);
+  const string id = parentLayerId->as<JSON::String>().value;
+  if (id == "") {
+    return None();
+  } else {
+    return id;
+  }
 }
 
 
-Future<list<pair<string, string>>> LocalPullerProcess::putLayers(
+Future<Nothing> LocalPullerProcess::extractLayers(
     const string& directory,
     const vector<string>& layerIds)
 {
-  list<Future<pair<string, string>>> futures;
+  list<Future<Nothing>> futures;
   foreach (const string& layerId, layerIds) {
-    const string tarredLayer =
-      paths::getImageArchiveLayerTarPath(directory, layerId);
-    futures.push_back(untarLayer(tarredLayer, directory, layerId));
+    futures.push_back(extractLayer(directory, layerId));
+  }
+
+  return collect(futures)
+    .then([]() { return Nothing(); });
+}
+
+
+Future<Nothing> LocalPullerProcess::extractLayer(
+    const string& directory,
+    const string& layerId)
+{
+  const string layerPath = path::join(directory, layerId);
+  const string tar = paths::getImageLayerTarPath(layerPath);
+  const string rootfs = paths::getImageLayerRootfsPath(layerPath);
+
+  Try<Nothing> mkdir = os::mkdir(rootfs);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create directory '" + rootfs + "'"
+        ": " + mkdir.error());
   }
 
-  return collect(futures);
+  return command::untar(Path(tar), Path(rootfs))
+    .then([tar]() -> Future<Nothing> {
+      // Remove the tar after the extraction.
+      Try<Nothing> rm = os::rm(tar);
+      if (rm.isError()) {
+        return Failure(
+          "Failed to remove '" + tar + "' "
+          "after extraction: " + rm.error());
+      }
+
+      return Nothing();
+    });
 }
 
 } // namespace docker {

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
index 811c24b..5f5aaa3 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
@@ -17,9 +17,9 @@
 #ifndef __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
 #define __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
 
-#include <mesos/docker/spec.hpp>
+#include <process/owned.hpp>
 
-#include "slave/containerizer/mesos/provisioner/store.hpp"
+#include <mesos/docker/spec.hpp>
 
 #include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
 
@@ -46,15 +46,14 @@ public:
 
   ~LocalPuller();
 
-  process::Future<std::list<std::pair<std::string, std::string>>> pull(
+  process::Future<std::vector<std::string>> pull(
       const ::docker::spec::ImageReference& reference,
-      const Path& directory);
+      const std::string& directory);
 
 private:
-  explicit LocalPuller(process::Owned<LocalPullerProcess>& _process);
+  explicit LocalPuller(process::Owned<LocalPullerProcess> _process);
 
   LocalPuller(const LocalPuller&) = delete; // Not copyable.
-
   LocalPuller& operator=(const LocalPuller&) = delete; // Not assignable.
 
   process::Owned<LocalPullerProcess> process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.cpp b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
index 82d92a2..a5cc952 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
@@ -38,74 +38,51 @@ string getStagingTempDir(const string& storeDir)
 }
 
 
-string getImageArchiveTarPath(
-    const string& discoveryDir,
-    const string& name)
+string getImageLayerPath(const string& storeDir, const string& layerId)
 {
-  return path::join(discoveryDir, name + ".tar");
-}
-
-
-string getImageArchiveRepositoriesPath(const string& archivePath)
-{
-  return path::join(archivePath, "repositories");
+  return path::join(storeDir, "layers", layerId);
 }
 
 
-string getImageArchiveLayerPath(
-    const string& archivePath,
-    const string& layerId)
+string getImageLayerManifestPath(const string& layerPath)
 {
-  return path::join(archivePath, layerId);
+  return path::join(layerPath, "json");
 }
 
 
-string getImageArchiveLayerManifestPath(
-    const string& archivePath,
-    const string& layerId)
+string getImageLayerManifestPath(const string& storeDir, const string& layerId)
 {
-  return path::join(getImageArchiveLayerPath(archivePath, layerId), "json");
+  return getImageLayerManifestPath(getImageLayerPath(storeDir, layerId));
 }
 
 
-string getImageArchiveLayerTarPath(
-    const string& archivePath,
-    const string& layerId)
+string getImageLayerRootfsPath(const string& layerPath)
 {
-  return path::join(
-      getImageArchiveLayerPath(archivePath, layerId), "layer.tar");
+  return path::join(layerPath, "rootfs");
 }
 
 
-string getImageArchiveLayerRootfsPath(
-    const string& archivePath,
-    const string& layerId)
+string getImageLayerRootfsPath(const string& storeDir, const string& layerId)
 {
-  return path::join(getImageArchiveLayerPath(archivePath, layerId), "rootfs");
+  return getImageLayerRootfsPath(getImageLayerPath(storeDir, layerId));
 }
 
 
-string getImageLayerPath(
-    const string& storeDir,
-    const string& layerId)
+string getImageLayerTarPath(const string& layerPath)
 {
-  return path::join(storeDir, "layers", layerId);
+  return path::join(layerPath, "layer.tar");
 }
 
 
-string getImageLayerManifestPath(
-    const string& storeDir,
-    const string& layerId)
+string getImageLayerTarPath(const string& storeDir, const string& layerId)
 {
-  return path::join(getImageLayerPath(storeDir, layerId), "json");
+  return getImageLayerTarPath(getImageLayerPath(storeDir, layerId));
 }
 
 
-string getImageLayerRootfsPath(
-    const string& storeDir,
-    const string& layerId)
+string getImageArchiveTarPath(const string& discoveryDir, const string& name)
 {
-  return path::join(getImageLayerPath(storeDir, layerId), "rootfs");
+  return path::join(discoveryDir, name + ".tar");
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.hpp b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
index d2b0cf9..949e0c1 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
@@ -17,11 +17,8 @@
 #ifndef __PROVISIONER_DOCKER_PATHS_HPP__
 #define __PROVISIONER_DOCKER_PATHS_HPP__
 
-#include <list>
 #include <string>
 
-#include <mesos/mesos.hpp>
-
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -32,8 +29,7 @@ namespace paths {
  * The Docker store file system layout is as follows:
  * Image store dir ('--docker_store_dir' slave flag)
  *    |--staging
- *       |-- repositories(containing all images info as JSON)
- *       |-- <temp_dir_archive>
+ *       |-- <staging_tmp_dir_XXXXXX>
  *           |-- <layer_id>
  *               |-- rootfs
  *               |-- json(manifest)
@@ -53,49 +49,43 @@ std::string getStagingDir(const std::string& storeDir);
 std::string getStagingTempDir(const std::string& storeDir);
 
 
-std::string getImageArchiveTarPath(
-    const std::string& discoveryDir,
-    const std::string& name);
-
-
-std::string getImageArchiveRepositoriesPath(const std::string& archivePath);
-
-
-std::string getImageArchiveLayerPath(
-    const std::string& archivePath,
+std::string getImageLayerPath(
+    const std::string& storeDir,
     const std::string& layerId);
 
 
-std::string getImageArchiveLayerManifestPath(
-    const std::string& archivePath,
-    const std::string& layerId);
+std::string getImageLayerManifestPath(
+    const std::string& layerPath);
 
 
-std::string getImageArchiveLayerTarPath(
-  const std::string& archivePath,
-  const std::string& layerId);
+std::string getImageLayerManifestPath(
+    const std::string& storeDir,
+    const std::string& layerId);
 
 
-std::string getImageArchiveLayerRootfsPath(
-  const std::string& archivePath,
-  const std::string& layerId);
+std::string getImageLayerRootfsPath(
+    const std::string& layerPath);
 
 
-std::string getImageLayerPath(
+std::string getImageLayerRootfsPath(
     const std::string& storeDir,
     const std::string& layerId);
 
 
-std::string getImageLayerManifestPath(
-    const std::string& storeDir,
-    const std::string& layerId);
+std::string getImageLayerTarPath(
+    const std::string& layerPath);
 
 
-std::string getImageLayerRootfsPath(
+std::string getImageLayerTarPath(
     const std::string& storeDir,
     const std::string& layerId);
 
 
+std::string getImageArchiveTarPath(
+    const std::string& discoveryDir,
+    const std::string& name);
+
+
 std::string getStoredImagesPath(const std::string& storeDir);
 
 } // namespace paths {

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
index a239b97..d012ae4 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
@@ -14,34 +14,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <string>
-#include <tuple>
-#include <utility>
-#include <vector>
-
-#include <stout/os.hpp>
-
-#include <process/check.hpp>
-#include <process/collect.hpp>
-#include <process/io.hpp>
-#include <process/subprocess.hpp>
-
-#include "common/status_utils.hpp"
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
 
 #include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
 
-using std::pair;
-using std::string;
-using std::tuple;
-using std::vector;
-
-using process::Failure;
-using process::Future;
 using process::Owned;
-using process::Subprocess;
 
 namespace mesos {
 namespace internal {
@@ -68,115 +48,6 @@ Try<Owned<Puller>> Puller::create(const Flags& flags)
   return puller.get();
 }
 
-
-Future<Nothing> untar(const string& file, const string& directory)
-{
-  const vector<string> argv = {
-    "tar",
-    "-C",
-    directory,
-    "-x",
-    "-f",
-    file
-  };
-
-  Try<Subprocess> s = subprocess(
-      "tar",
-      argv,
-      Subprocess::PATH("/dev/null"),
-      Subprocess::PATH("/dev/null"),
-      Subprocess::PIPE());
-
-  if (s.isError()) {
-    return Failure("Failed to execute the subprocess: " + s.error());
-  }
-
-  return await(
-      s.get().status(),
-      process::io::read(s.get().err().get()))
-    .then([](const tuple<
-        Future<Option<int>>,
-        Future<string>>& t) -> Future<Nothing> {
-      Future<Option<int>> status = std::get<0>(t);
-      if (!status.isReady()) {
-        return Failure(
-          "Failed to get the exit status of the subprocess: " +
-          (status.isFailed() ? status.failure() : "discarded"));
-      }
-
-      Future<string> error = std::get<1>(t);
-      if (!error.isReady()) {
-        return Failure(
-          "Failed to read stderr from the subprocess: " +
-          (error.isFailed() ? error.failure() : "discarded"));
-      }
-
-      if (status->isNone()) {
-        return Failure("Failed to reap the subprocess");
-      }
-
-      if (status->get() != 0) {
-        return Failure(
-            "Unexpected result from the subprocess: " +
-            WSTRINGIFY(status->get()) +
-            ", stderr='" + error.get() + "'");
-      }
-
-      return Nothing();
-    });
-}
-
-
-Future<pair<string, string>> untarLayer(
-    const string& file,
-    const string& directory,
-    const string& layerId)
-{
-  // We untar the layer from source into a directory, then move the layer
-  // into store. We do this instead of untarring directly to store to make
-  // sure we don't end up with partially untarred layer rootfs.
-  const string localRootfsPath =
-    paths::getImageArchiveLayerRootfsPath(directory, layerId);
-
-  // Image layer has been untarred but is not present in the store directory.
-  if (os::exists(localRootfsPath)) {
-    LOG(WARNING) << "Image layer '" << layerId << "' rootfs present in staging "
-                 << "directory but not in store directory '"
-                 << localRootfsPath << "'. Removing staged rootfs and untarring"
-                 << "layer again.";
-
-    Try<Nothing> rmdir = os::rmdir(localRootfsPath);
-    if (rmdir.isError()) {
-      return Failure(
-          "Failed to remove incomplete staged rootfs for layer "
-          "'" + layerId + "': " + rmdir.error());
-    }
-  }
-
-  Try<Nothing> mkdir = os::mkdir(localRootfsPath);
-  if (mkdir.isError()) {
-    return Failure(
-        "Failed to create rootfs path '" + localRootfsPath + "'"
-        ": " + mkdir.error());
-  }
-
-  // The tar file will be removed when the staging directory is removed.
-  return untar(file, localRootfsPath)
-    .then([directory, layerId]() -> Future<pair<string, string>> {
-      const string layerPath =
-        paths::getImageArchiveLayerPath(directory, layerId);
-
-      if (!os::exists(layerPath)) {
-        return Failure(
-            "Failed to find the rootfs path after extracting layer"
-            " '" + layerId + "'");
-      }
-
-      return pair<string, string>(layerId, layerPath);
-    });
-}
-
-
 } // namespace docker {
 } // namespace slave {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
index 5b2d72c..51894dd 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
@@ -17,12 +17,9 @@
 #ifndef __PROVISIONER_DOCKER_PULLER_HPP__
 #define __PROVISIONER_DOCKER_PULLER_HPP__
 
-#include <list>
-#include <utility>
+#include <string>
+#include <vector>
 
-#include <stout/duration.hpp>
-#include <stout/option.hpp>
-#include <stout/path.hpp>
 #include <stout/try.hpp>
 
 #include <process/future.hpp>
@@ -47,47 +44,17 @@ public:
   /**
    * Pull a Docker image layers into the specified directory, and
    * return the list of layer ids in that image in the right
-   * dependency order, and also return the directory where
-   * the puller puts its changeset.
+   * dependency order (i.e., base images are at the front).
    *
-   * @param name The name of the image.
+   * @param reference The docker image reference.
    * @param directory The target directory to store the layers.
-   * @return list of layers maped to its local directory ordered by its
-   *         dependency.
+   * @return an ordered list of layer ids.
    */
-  virtual process::Future<std::list<std::pair<std::string, std::string>>> pull(
+  virtual process::Future<std::vector<std::string>> pull(
       const ::docker::spec::ImageReference& reference,
-      const Path& directory) = 0;
+      const std::string& directory) = 0;
 };
 
-
-/**
- * Untars(extracts) the tar file(input param) to the given output directory.
- *
- * @param file tar file to be extracted.
- * @param directory target directory for extracting the tar file.
- */
-process::Future<Nothing> untar(
-    const std::string& file,
-    const std::string& directory);
-
-
-/**
- * Untars a tarred layer changeset into staging directory with the
- * directory structure:
- *    |--staging directory
- *        |-- <layer_id>
- *            |-- rootfs
- *
- * @param file path to the tar file holding the Docker layer.
- * @param directory staging directory.
- * @return layer Id mapping to the rootfs path of the layer.
- */
-process::Future<std::pair<std::string, std::string>> untarLayer(
-    const std::string& file,
-    const std::string& directory,
-    const std::string& layerId);
-
 } // namespace docker {
 } // namespace slave {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
index 3fcf147..d09e2ec 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
@@ -14,25 +14,23 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
-
-#include <list>
-
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
-#include <process/subprocess.hpp>
 
-#include "common/status_utils.hpp"
+#include <stout/os/mkdir.hpp>
+#include <stout/os/rm.hpp>
+
+#include "common/command_utils.hpp"
 
 #include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
 
 namespace http = process::http;
 namespace spec = docker::spec;
 
 using std::list;
-using std::pair;
 using std::string;
 using std::vector;
 
@@ -41,7 +39,6 @@ using process::Future;
 using process::Owned;
 using process::Process;
 using process::Promise;
-using process::Subprocess;
 
 namespace mesos {
 namespace internal {
@@ -56,37 +53,37 @@ class RegistryPullerProcess : public Process<RegistryPullerProcess>
 public:
   static Try<Owned<RegistryPullerProcess>> create(const Flags& flags);
 
-  process::Future<list<pair<string, string>>> pull(
+  process::Future<vector<string>> pull(
       const spec::ImageReference& reference,
-      const Path& directory);
+      const string& directory);
 
 private:
-  explicit RegistryPullerProcess(
+  RegistryPullerProcess(
       const Owned<RegistryClient>& registry,
       const Duration& timeout);
 
-  Future<pair<string, string>> downloadLayer(
-      const spec::ImageReference& reference,
-      const Path& directory,
-      const string& blobSum,
-      const string& id);
-
-  Future<list<pair<string, string>>> downloadLayers(
+  Future<vector<string>> downloadLayers(
       const spec::v2::ImageManifest& manifest,
       const spec::ImageReference& reference,
-      const Path& downloadDir);
+      const string& directory);
 
-  process::Future<list<pair<string, string>>> _pull(
+  Future<Nothing> downloadLayer(
       const spec::ImageReference& reference,
-      const Path& downloadDir);
+      const string& directory,
+      const string& blobSum,
+      const string& layerId);
 
-  Future<list<pair<string, string>>> untarLayers(
-    const Future<list<pair<string, string>>>& layerFutures,
-    const Path& downloadDir);
+  Future<vector<string>> untarLayers(
+      const string& directory,
+      const vector<string>& layerIds);
+
+  Future<Nothing> untarLayer(
+      const string& directory,
+      const string& layerId);
 
   Owned<RegistryClient> registryClient_;
   const Duration pullTimeout_;
-  hashmap<string, Owned<Promise<pair<string, string>>>> downloadTracker_;
+  hashmap<string, Owned<Promise<Nothing>>> downloadTracker_;
 
   RegistryPullerProcess(const RegistryPullerProcess&) = delete;
   RegistryPullerProcess& operator=(const RegistryPullerProcess&) = delete;
@@ -106,29 +103,29 @@ Try<Owned<Puller>> RegistryPuller::create(const Flags& flags)
 }
 
 
-RegistryPuller::RegistryPuller(const Owned<RegistryPullerProcess>& process)
-  : process_(process)
+RegistryPuller::RegistryPuller(Owned<RegistryPullerProcess> _process)
+  : process(_process)
 {
-  spawn(CHECK_NOTNULL(process_.get()));
+  spawn(CHECK_NOTNULL(process.get()));
 }
 
 
 RegistryPuller::~RegistryPuller()
 {
-  terminate(process_.get());
-  process::wait(process_.get());
+  terminate(process.get());
+  process::wait(process.get());
 }
 
 
-Future<list<pair<string, string>>> RegistryPuller::pull(
+Future<vector<string>> RegistryPuller::pull(
     const spec::ImageReference& reference,
-    const Path& downloadDir)
+    const string& directory)
 {
   return dispatch(
-      process_.get(),
+      process.get(),
       &RegistryPullerProcess::pull,
       reference,
-      downloadDir);
+      directory);
 }
 
 
@@ -173,9 +170,58 @@ RegistryPullerProcess::RegistryPullerProcess(
     pullTimeout_(timeout) {}
 
 
-Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
+Future<vector<string>> RegistryPullerProcess::pull(
+    const spec::ImageReference& reference,
+    const string& directory)
+{
+  // TODO(jojy): Have one outgoing manifest request per image.
+  return registryClient_->getManifest(reference)
+    .then(process::defer(self(), [=](const spec::v2::ImageManifest& manifest) {
+      return downloadLayers(manifest, reference, directory);
+    }))
+    .then(process::defer(self(), [=](const vector<string>& layerIds) {
+      return untarLayers(directory, layerIds);
+    }))
+    .after(pullTimeout_, [reference](Future<vector<string>> future) {
+      future.discard();
+      return Failure("Timed out");
+    });
+}
+
+
+Future<vector<string>> RegistryPullerProcess::downloadLayers(
+    const spec::v2::ImageManifest& manifest,
     const spec::ImageReference& reference,
-    const Path& directory,
+    const string& directory)
+{
+  list<Future<Nothing>> futures;
+  vector<string> layerIds;
+
+  CHECK_EQ(manifest.fslayers_size(), manifest.history_size());
+
+  for (int i = 0; i < manifest.fslayers_size(); i++) {
+    CHECK(manifest.history(i).has_v1());
+
+    layerIds.push_back(manifest.history(i).v1().id());
+
+    futures.push_back(downloadLayer(
+        reference,
+        directory,
+        manifest.fslayers(i).blobsum(),
+        manifest.history(i).v1().id()));
+  }
+
+  // TODO(jojy): Delete downloaded files in the directory on discard and
+  // failure?
+  // TODO(jojy): Iterate through the futures and log the failed future.
+  return collect(futures)
+    .then([layerIds]() { return layerIds; });
+}
+
+
+Future<Nothing> RegistryPullerProcess::downloadLayer(
+    const spec::ImageReference& reference,
+    const string& directory,
     const string& blobSum,
     const string& layerId)
 {
@@ -189,8 +235,7 @@ Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
     return downloadTracker_.at(layerId)->future();
   }
 
-  Owned<Promise<pair<string, string>>> downloadPromise(
-      new Promise<pair<string, string>>());
+  Owned<Promise<Nothing>> downloadPromise(new Promise<Nothing>());
 
   downloadTracker_.insert({layerId, downloadPromise});
 
@@ -216,7 +261,7 @@ Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
             downloadPromise->fail(
                 "Failed to download layer '" + layerId + "': no content");
           } else {
-            downloadPromise->set({layerId, downloadFile});
+            downloadPromise->set(Nothing());
           }
         }));
 
@@ -224,71 +269,51 @@ Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
 }
 
 
-Future<list<pair<string, string>>> RegistryPullerProcess::pull(
-    const spec::ImageReference& reference,
-    const Path& directory)
+Future<vector<string>> RegistryPullerProcess::untarLayers(
+    const string& directory,
+    const vector<string>& layerIds)
 {
-  // TODO(jojy): Have one outgoing manifest request per image.
-  return registryClient_->getManifest(reference)
-    .then(process::defer(self(), [this, directory, reference](
-        const spec::v2::ImageManifest& manifest) {
-      return downloadLayers(manifest, reference, directory);
-    }))
-    .then(process::defer(self(), [this, directory](
-        const Future<list<pair<string, string>>>& layerFutures)
-        -> Future<list<pair<string, string>>> {
-      return untarLayers(layerFutures, directory);
-    }))
-    .after(pullTimeout_, [reference](
-        Future<list<pair<string, string>>> future) {
-      future.discard();
+  list<Future<Nothing>> futures;
+  foreach (const string& layerId, layerIds) {
+    VLOG(1) << "Untarring layer '" << layerId
+            << "' downloaded from registry to directory '"
+            << directory << "'";
 
-      return Failure("Timed out");
-    });
-}
-
-
-Future<list<pair<string, string>>> RegistryPullerProcess::downloadLayers(
-    const spec::v2::ImageManifest& manifest,
-    const spec::ImageReference& reference,
-    const Path& directory)
-{
-  list<Future<pair<string, string>>> downloadFutures;
-
-  CHECK_EQ(manifest.fslayers_size(), manifest.history_size());
-
-  for (int i = 0; i < manifest.fslayers_size(); i++) {
-    CHECK(manifest.history(i).has_v1());
-
-    downloadFutures.push_back(
-        downloadLayer(reference,
-                      directory,
-                      manifest.fslayers(i).blobsum(),
-                      manifest.history(i).v1().id()));
+    futures.emplace_back(untarLayer(directory, layerId));
   }
 
-  // TODO(jojy): Delete downloaded files in the directory on discard and
-  // failure?
-  // TODO(jojy): Iterate through the futures and log the failed future.
-  return collect(downloadFutures);
+  return collect(futures)
+    .then([layerIds]() { return layerIds; });
 }
 
 
-Future<list<pair<string, string>>> RegistryPullerProcess::untarLayers(
-    const Future<list<pair<string, string>>>& layerFutures,
-    const Path& directory)
+Future<Nothing> RegistryPullerProcess::untarLayer(
+    const string& directory,
+    const string& layerId)
 {
-  list<Future<pair<string, string>>> untarFutures;
-
-  pair<string, string> layerInfo;
-  foreach (layerInfo, layerFutures.get()) {
-    VLOG(1) << "Untarring layer '" << layerInfo.first
-            << "' downloaded from registry to directory '" << directory << "'";
-    untarFutures.emplace_back(
-        untarLayer(layerInfo.second, directory, layerInfo.first));
+  const string layerPath = path::join(directory, layerId);
+  const string tar = path::join(directory, layerId + ".tar");
+  const string rootfs = paths::getImageLayerRootfsPath(layerPath);
+
+  Try<Nothing> mkdir = os::mkdir(rootfs);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create directory '" + rootfs + "'"
+        ": " + mkdir.error());
   }
 
-  return collect(untarFutures);
+  return command::untar(Path(tar), Path(rootfs))
+    .then([tar]() -> Future<Nothing> {
+      // Remove the tar after the extraction.
+      Try<Nothing> rm = os::rm(tar);
+      if (rm.isError()) {
+        return Failure(
+          "Failed to remove '" + tar + "' "
+          "after extraction: " + rm.error());
+      }
+
+      return Nothing();
+    });
 }
 
 } // namespace docker {

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
index bccbac1..c429df9 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
@@ -17,21 +17,14 @@
 #ifndef __PROVISIONER_DOCKER_REGISTRY_PULLER_HPP__
 #define __PROVISIONER_DOCKER_REGISTRY_PULLER_HPP__
 
-#include <list>
-#include <string>
-#include <utility>
+#include <process/owned.hpp>
 
-#include <stout/duration.hpp>
-#include <stout/path.hpp>
-
-#include <process/future.hpp>
-#include <process/http.hpp>
-#include <process/process.hpp>
-
-#include <mesos/docker/spec.hpp>
+#include <stout/try.hpp>
 
 #include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
 
+#include "slave/flags.hpp"
+
 namespace mesos {
 namespace internal {
 namespace slave {
@@ -41,36 +34,33 @@ namespace docker {
 class RegistryPullerProcess;
 
 /*
- * Pulls an image from registry.
+ * Pulls an image from docker registry.
  */
 class RegistryPuller : public Puller
 {
 public:
-  /**
-   * Factory method for creating RegistryPuller.
-   */
   static Try<process::Owned<Puller>> create(const Flags& flags);
 
   ~RegistryPuller();
 
   /**
-   * Pulls an image into a download directory. This image could consist
-   * multiple layers of blobs.
+   * Pulls an image into a download directory. This image could
+   * consist multiple layers of blobs.
    *
    * @param reference local name of the image.
-   * @param downloadDir path to which the layers should be downloaded.
+   * @param directory path to which the layers will be downloaded.
    */
-  process::Future<std::list<std::pair<std::string, std::string>>> pull(
+  process::Future<std::vector<std::string>> pull(
       const ::docker::spec::ImageReference& reference,
-      const Path& downloadDir);
+      const std::string& directory);
 
 private:
-  RegistryPuller(const process::Owned<RegistryPullerProcess>& process);
-
-  process::Owned<RegistryPullerProcess> process_;
+  RegistryPuller(process::Owned<RegistryPullerProcess> _process);
 
   RegistryPuller(const RegistryPuller&) = delete;
   RegistryPuller& operator=(const RegistryPuller&) = delete;
+
+  process::Owned<RegistryPullerProcess> process;
 };
 
 } // namespace docker {

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/store.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
index 2f1d3e0..d802a79 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -14,7 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <list>
+#include <string>
 #include <vector>
 
 #include <glog/logging.h>
@@ -26,12 +26,9 @@
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
-#include <process/subprocess.hpp>
 
 #include <mesos/docker/spec.hpp>
 
-#include "common/status_utils.hpp"
-
 #include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/store.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
@@ -42,7 +39,6 @@ using namespace process;
 namespace spec = docker::spec;
 
 using std::list;
-using std::pair;
 using std::string;
 using std::vector;
 
@@ -76,16 +72,15 @@ private:
   Future<ImageInfo> __get(const Image& image);
 
   Future<vector<string>> moveLayers(
-      const list<pair<string, string>>& layerPaths);
-
-  Future<Image> storeImage(
-      const spec::ImageReference& reference,
+      const string& staging,
       const vector<string>& layerIds);
 
   Future<Nothing> moveLayer(
-      const pair<string, string>& layerPath);
+      const string& staging,
+      const string& layerId);
 
   const Flags flags;
+
   Owned<MetadataManager> metadataManager;
   Owned<Puller> puller;
   hashmap<string, Owned<Promise<Image>>> pulling;
@@ -136,7 +131,7 @@ Try<Owned<slave::Store>> Store::create(
 }
 
 
-Store::Store(const Owned<StoreProcess>& _process) : process(_process)
+Store::Store(Owned<StoreProcess> _process) : process(_process)
 {
   spawn(CHECK_NOTNULL(process.get()));
 }
@@ -161,6 +156,12 @@ Future<ImageInfo> Store::get(const mesos::Image& image)
 }
 
 
+Future<Nothing> StoreProcess::recover()
+{
+  return metadataManager->recover();
+}
+
+
 Future<ImageInfo> StoreProcess::get(const mesos::Image& image)
 {
   if (image.type() != mesos::Image::DOCKER) {
@@ -199,19 +200,23 @@ Future<Image> StoreProcess::_get(
     os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir));
 
   if (staging.isError()) {
-    return Failure("Failed to create a staging directory");
+    return Failure("Failed to create a staging directory: " + staging.error());
   }
 
-  const string imageReference = stringify(reference);
+  // If there is already an pulling going on for the given 'name', we
+  // will skip the additional pulling.
+  const string name = stringify(reference);
 
-  if (!pulling.contains(imageReference)) {
+  if (!pulling.contains(name)) {
     Owned<Promise<Image>> promise(new Promise<Image>());
 
-    Future<Image> future = puller->pull(reference, Path(staging.get()))
-      .then(defer(self(), &Self::moveLayers, lambda::_1))
-      .then(defer(self(), &Self::storeImage, reference, lambda::_1))
+    Future<Image> future = puller->pull(reference, staging.get())
+      .then(defer(self(), &Self::moveLayers, staging.get(), lambda::_1))
+      .then(defer(self(), [=](const vector<string>& layerIds) {
+        return metadataManager->put(reference, layerIds);
+      }))
       .onAny(defer(self(), [=](const Future<Image>&) {
-        pulling.erase(imageReference);
+        pulling.erase(name);
 
         Try<Nothing> rmdir = os::rmdir(staging.get());
         if (rmdir.isError()) {
@@ -221,12 +226,12 @@ Future<Image> StoreProcess::_get(
       }));
 
     promise->associate(future);
-    pulling[imageReference] = promise;
+    pulling[name] = promise;
 
     return promise->future();
   }
 
-  return pulling[imageReference]->future();
+  return pulling[name]->future();
 }
 
 
@@ -234,11 +239,10 @@ Future<ImageInfo> StoreProcess::__get(const Image& image)
 {
   CHECK_LT(0, image.layer_ids_size());
 
-  vector<string> layerDirectories;
-  foreach (const string& layer, image.layer_ids()) {
-    layerDirectories.push_back(
-        paths::getImageLayerRootfsPath(
-            flags.docker_store_dir, layer));
+  vector<string> layerPaths;
+  foreach (const string& layerId, image.layer_ids()) {
+    layerPaths.push_back(
+        paths::getImageLayerRootfsPath(flags.docker_store_dir, layerId));
   }
 
   // Read the manifest from the last layer because all runtime config
@@ -259,78 +263,61 @@ Future<ImageInfo> StoreProcess::__get(const Image& image)
     return Failure("Failed to parse docker v1 manifest: " + v1.error());
   }
 
-  return ImageInfo{layerDirectories, v1.get()};
-}
-
-
-Future<Nothing> StoreProcess::recover()
-{
-  return metadataManager->recover();
+  return ImageInfo{layerPaths, v1.get()};
 }
 
 
 Future<vector<string>> StoreProcess::moveLayers(
-    const list<pair<string, string>>& layerPaths)
+    const string& staging,
+    const vector<string>& layerIds)
 {
   list<Future<Nothing>> futures;
-  foreach (const auto& layerPath, layerPaths) {
-    futures.push_back(moveLayer(layerPath));
+  foreach (const string& layerId, layerIds) {
+    futures.push_back(moveLayer(staging, layerId));
   }
 
   return collect(futures)
-    .then([layerPaths]() {
-      vector<string> layerIds;
-      foreach (const auto& layerPath, layerPaths) {
-        layerIds.push_back(layerPath.first);
-      }
-
-      return layerIds;
-    });
-}
-
-
-Future<Image> StoreProcess::storeImage(
-    const spec::ImageReference& reference,
-    const vector<string>& layerIds)
-{
-  return metadataManager->put(reference, layerIds);
+    .then([layerIds]() { return layerIds; });
 }
 
 
 Future<Nothing> StoreProcess::moveLayer(
-    const pair<string, string>& layerPath)
+    const string& staging,
+    const string& layerId)
 {
-  if (!os::exists(layerPath.second)) {
-    return Failure("Unable to find layer '" + layerPath.first + "' in '" +
-                   layerPath.second + "'");
+  const string source = path::join(staging, layerId);
+
+  // This is the case where the puller skips the pulling of the layer
+  // because the layer already exists in the store.
+  //
+  // TODO(jieyu): Verify that the layer is actually in the store.
+  if (!os::exists(source)) {
+    return Nothing();
   }
 
-  const string imageLayerPath =
-    paths::getImageLayerPath(flags.docker_store_dir, layerPath.first);
-
-  // If image layer path exists, we should remove it and make an empty
-  // directory, because os::rename can only have empty or non-existed
-  // directory as destination.
-  if (os::exists(imageLayerPath)) {
-    Try<Nothing> rmdir = os::rmdir(imageLayerPath);
-    if (rmdir.isError()) {
-      return Failure("Failed to remove existing layer: " + rmdir.error());
-    }
+  const string target = paths::getImageLayerPath(
+      flags.docker_store_dir,
+      layerId);
+
+  // NOTE: Since the layer id is supposed to be unique. If the layer
+  // already exists in the store, we'll skip the moving since they are
+  // expected to be the same.
+  if (os::exists(target)) {
+    return Nothing();
   }
 
-  Try<Nothing> mkdir = os::mkdir(imageLayerPath);
+  Try<Nothing> mkdir = os::mkdir(target);
   if (mkdir.isError()) {
-    return Failure("Failed to create layer path in store for id '" +
-                   layerPath.first + "': " + mkdir.error());
+    return Failure(
+        "Failed to create directory in store for layer '" +
+        layerId + "': " + mkdir.error());
   }
 
-  Try<Nothing> status = os::rename(
-      layerPath.second,
-      imageLayerPath);
-
-  if (status.isError()) {
-    return Failure("Failed to move layer '" + layerPath.first +
-                   "' to store directory: " + status.error());
+  Try<Nothing> rename = os::rename(source, target);
+  if (rename.isError()) {
+    return Failure(
+        "Failed to move layer from '" + source +
+        "' to '" + target + "': " + rename.error());
   }
 
   return Nothing();

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/store.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.hpp b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
index 880e216..1c2b149 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
@@ -53,7 +53,7 @@ public:
   virtual process::Future<ImageInfo> get(const mesos::Image& image);
 
 private:
-  explicit Store(const process::Owned<StoreProcess>& process);
+  explicit Store(process::Owned<StoreProcess> process);
 
   Store& operator=(const Store&) = delete; // Not assignable.
   Store(const Store&) = delete; // Not copyable.

http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/tests/containerizer/provisioner_docker_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/provisioner_docker_tests.cpp b/src/tests/containerizer/provisioner_docker_tests.cpp
index 4db6793..c6b1e69 100644
--- a/src/tests/containerizer/provisioner_docker_tests.cpp
+++ b/src/tests/containerizer/provisioner_docker_tests.cpp
@@ -766,12 +766,12 @@ TEST_F(RegistryClientTest, SimpleRegistryPuller)
   Try<Owned<Puller>> registryPuller = RegistryPuller::create(flags);
   ASSERT_SOME(registryPuller);
 
-  const Path registryPullerPath(os::getcwd());
+  const string registryPullerPath = os::getcwd();
 
   Try<spec::ImageReference> reference = spec::parseImageReference("busybox");
   ASSERT_SOME(reference);
 
-  Future<list<pair<string, string>>> registryPullerFuture =
+  Future<vector<string>> registryPullerFuture =
     registryPuller.get()->pull(reference.get(), registryPullerPath);
 
   const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
@@ -974,13 +974,17 @@ TEST_F(RegistryClientTest, SimpleRegistryPuller)
       blobResponseSize));
 
   AWAIT_ASSERT_READY(registryPullerFuture);
-  list<pair<string, string>> layers = registryPullerFuture.get();
+  vector<string> layers = registryPullerFuture.get();
   ASSERT_EQ(1u, layers.size());
-  ASSERT_EQ(layers.front().first,
+  ASSERT_EQ(layers.front(),
             "1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
 
-  Try<string> blob = os::read(
-      path::join(layers.front().second, "rootfs", blobFile));
+  Try<string> blob = os::read(path::join(
+      registryPullerPath,
+      layers.front(),
+      "rootfs",
+      blobFile));
+
   ASSERT_SOME(blob);
   ASSERT_EQ(blob.get(), blobResponse);
 }
@@ -1154,16 +1158,16 @@ public:
 
   MOCK_METHOD2(
       pull,
-      Future<list<pair<string, string>>>(
+      Future<vector<string>>(
           const spec::ImageReference&,
-          const Path&));
+          const string&));
 
-  Future<list<pair<string, string>>> unmocked_pull(
-      const spec::ImageReference& name,
-      const Path& directory)
+  Future<vector<string>> unmocked_pull(
+      const spec::ImageReference& reference,
+      const string& directory)
   {
     // TODO(gilbert): Allow return list to be overridden.
-    return list<pair<string, string>>();
+    return vector<string>();
   }
 };
 
@@ -1184,10 +1188,12 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
 
   MockPuller* puller = new MockPuller();
   Future<Nothing> pull;
-  Promise<list<pair<string, string>>> promise;
+  Future<string> directory;
+  Promise<vector<string>> promise;
 
   EXPECT_CALL(*puller, pull(_, _))
     .WillOnce(testing::DoAll(FutureSatisfy(&pull),
+                             FutureArg<1>(&directory),
                              Return(promise.future())));
 
   Try<Owned<slave::Store>> store =
@@ -1200,11 +1206,12 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
 
   Future<slave::ImageInfo> imageInfo1 = store.get()->get(mesosImage);
   AWAIT_READY(pull);
+  AWAIT_READY(directory);
 
   // TODO(gilbert): Need a helper method to create test layers
   // which will allow us to set manifest so that we can add
   // checks here.
-  const string layerPath = path::join(os::getcwd(), "456");
+  const string layerPath = path::join(directory.get(), "456");
 
   Try<Nothing> mkdir = os::mkdir(layerPath);
   ASSERT_SOME(mkdir);
@@ -1220,7 +1227,7 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
   ASSERT_TRUE(imageInfo1.isPending());
   Future<slave::ImageInfo> imageInfo2 = store.get()->get(mesosImage);
 
-  const list<pair<string, string>> result = {{"456", layerPath}};
+  const vector<string> result = {"456"};
 
   ASSERT_TRUE(imageInfo2.isPending());
   promise.set(result);


[3/3] mesos git commit: Removed the restriction that /tmp needs to be writable in new rootfs.

Posted by ji...@apache.org.
Removed the restriction that /tmp needs to be writable in new rootfs.

Review: https://reviews.apache.org/r/43896


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14f070fd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14f070fd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14f070fd

Branch: refs/heads/master
Commit: 14f070fda25c98c0a8ba29da84c607f2dd86da6a
Parents: 66d0f44
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Feb 23 10:49:11 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 23 15:08:08 2016 -0800

----------------------------------------------------------------------
 src/linux/fs.cpp | 33 +++++++++++++++++++++------------
 1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/14f070fd/src/linux/fs.cpp
----------------------------------------------------------------------
diff --git a/src/linux/fs.cpp b/src/linux/fs.cpp
index 0df1942..7792f68 100644
--- a/src/linux/fs.cpp
+++ b/src/linux/fs.cpp
@@ -584,21 +584,25 @@ Try<Nothing> enter(const string& root)
     return Error("Failed to create devices: " + create.error());
   }
 
-  // Create a /tmp directory if it doesn't exist.
-  // TODO(idownes): Consider mounting a tmpfs to /tmp.
+  // Prepare /tmp in the new root. Note that we cannot assume that the
+  // new root is writable (i.e., it could be a read only filesystem).
+  // Therefore, we always mount a tmpfs on /tmp in the new root so
+  // that we can create the mount point for the old root.
   if (!os::exists(path::join(root, "tmp"))) {
-    Try<Nothing> mkdir = os::mkdir(path::join(root, "tmp"));
-     if (mkdir.isError()) {
-       return Error("Failed to create /tmp in chroot: " + mkdir.error());
-     }
+    return Error("/tmp in chroot does not exist");
+  }
 
-     Try<Nothing> chmod = os::chmod(
-         path::join(root, "tmp"),
-         S_IRWXU | S_IRWXG | S_IRWXO | S_ISVTX);
+  // TODO(jieyu): Consider limiting the size of the tmpfs.
+  mount = fs::mount(
+      "tmpfs",
+      path::join(root, "tmp"),
+      "tmpfs",
+      MS_NOSUID | MS_NOEXEC | MS_NODEV,
+      "mode=1777");
 
-     if (chmod.isError()) {
-       return Error("Failed to set mode on /tmp: " + chmod.error());
-     }
+  if (mount.isError()) {
+    return Error("Failed to mount the temporary tmpfs at /tmp in new root: " +
+                 mount.error());
   }
 
   // Create a mount point for the old root.
@@ -661,6 +665,11 @@ Try<Nothing> enter(const string& root)
   // Check status when we stop using lazy umounts.
   os::rmdir(relativeOld);
 
+  Try<Nothing> unmount = fs::unmount("/tmp");
+  if (unmount.isError()) {
+    return Error("Failed to umount /tmp in the chroot: " + unmount.error());
+  }
+
   return Nothing();
 }
 


[2/3] mesos git commit: Used uri::Fetcher to pull docker images in docker registry puller.

Posted by ji...@apache.org.
Used uri::Fetcher to pull docker images in docker registry puller.

Review: https://reviews.apache.org/r/43860


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/66d0f449
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/66d0f449
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/66d0f449

Branch: refs/heads/master
Commit: 66d0f4496f146fb0c68d23e08d25a9af108c6218
Parents: caf6c02
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Feb 22 16:52:01 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 23 15:08:08 2016 -0800

----------------------------------------------------------------------
 docs/configuration.md                           |  17 -
 docs/endpoints/slave/state.json.md              |   2 -
 docs/endpoints/slave/state.md                   |   2 -
 .../mesos/provisioner/docker/puller.cpp         |   7 +-
 .../mesos/provisioner/docker/puller.hpp         |   7 +-
 .../provisioner/docker/registry_puller.cpp      | 319 +++---
 .../provisioner/docker/registry_puller.hpp      |   7 +-
 .../mesos/provisioner/docker/store.cpp          |  11 +-
 src/slave/flags.cpp                             |  10 -
 src/slave/flags.hpp                             |   2 -
 src/slave/http.cpp                              |   2 -
 .../containerizer/provisioner_docker_tests.cpp  | 978 +------------------
 12 files changed, 220 insertions(+), 1144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index b04e873..2353e78 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1057,15 +1057,6 @@ containerizer.
 </tr>
 <tr>
   <td>
-    --docker_auth_server=VALUE
-  </td>
-  <td>
-Docker authentication server used to authenticate with Docker registry
-(default: https://auth.docker.io)
-  </td>
-</tr>
-<tr>
-  <td>
     --[no-]docker_kill_orphans
   </td>
   <td>
@@ -1090,14 +1081,6 @@ recovers.
 </tr>
 <tr>
   <td>
-    --docker_puller_timeout=VALUE
-  </td>
-  <td>
-Timeout in seconds for pulling images from the Docker registry (default: 60secs)
-  </td>
-</tr>
-<tr>
-  <td>
     --docker_registry=VALUE
   </td>
   <td>

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/docs/endpoints/slave/state.json.md
----------------------------------------------------------------------
diff --git a/docs/endpoints/slave/state.json.md b/docs/endpoints/slave/state.json.md
index 0a31159..a6403ee 100644
--- a/docs/endpoints/slave/state.json.md
+++ b/docs/endpoints/slave/state.json.md
@@ -80,12 +80,10 @@ Example (**Note**: this is not exhaustive):
          "work_dir" : "/tmp/mesos",
          "launcher_dir" : "/path/to/mesos/build/src",
          "registration_backoff_factor" : "1secs",
-         "docker_auth_server" : "https://auth.docker.io",
          "oversubscribed_resources_interval" : "15secs",
          "enforce_container_disk_quota" : "false",
          "container_disk_watch_interval" : "15secs",
          "disk_watch_interval" : "1mins",
-         "docker_puller_timeout" : "60",
          "cgroups_limit_swap" : "false",
          "hostname_lookup" : "true",
          "perf_duration" : "10secs",

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/docs/endpoints/slave/state.md
----------------------------------------------------------------------
diff --git a/docs/endpoints/slave/state.md b/docs/endpoints/slave/state.md
index 1077f69..bdd555e 100644
--- a/docs/endpoints/slave/state.md
+++ b/docs/endpoints/slave/state.md
@@ -80,12 +80,10 @@ Example (**Note**: this is not exhaustive):
          "work_dir" : "/tmp/mesos",
          "launcher_dir" : "/path/to/mesos/build/src",
          "registration_backoff_factor" : "1secs",
-         "docker_auth_server" : "https://auth.docker.io",
          "oversubscribed_resources_interval" : "15secs",
          "enforce_container_disk_quota" : "false",
          "container_disk_watch_interval" : "15secs",
          "disk_watch_interval" : "1mins",
-         "docker_puller_timeout" : "60",
          "cgroups_limit_swap" : "false",
          "hostname_lookup" : "true",
          "perf_duration" : "10secs",

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
index d012ae4..ac9dae8 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
@@ -22,13 +22,16 @@
 #include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
 
 using process::Owned;
+using process::Shared;
 
 namespace mesos {
 namespace internal {
 namespace slave {
 namespace docker {
 
-Try<Owned<Puller>> Puller::create(const Flags& flags)
+Try<Owned<Puller>> Puller::create(
+    const Flags& flags,
+    const Shared<uri::Fetcher>& fetcher)
 {
   // TODO(tnachen): Support multiple registries in the puller.
   if (strings::startsWith(flags.docker_registry, "/")) {
@@ -40,7 +43,7 @@ Try<Owned<Puller>> Puller::create(const Flags& flags)
     return puller.get();
   }
 
-  Try<Owned<Puller>> puller = RegistryPuller::create(flags);
+  Try<Owned<Puller>> puller = RegistryPuller::create(flags, fetcher);
   if (puller.isError()) {
     return Error("Failed to create registry puller: " + puller.error());
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
index 51894dd..191f3fc 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
@@ -24,9 +24,12 @@
 
 #include <process/future.hpp>
 #include <process/owned.hpp>
+#include <process/shared.hpp>
 
 #include <mesos/docker/spec.hpp>
 
+#include <mesos/uri/fetcher.hpp>
+
 #include "slave/flags.hpp"
 
 namespace mesos {
@@ -37,7 +40,9 @@ namespace docker {
 class Puller
 {
 public:
-  static Try<process::Owned<Puller>> create(const Flags& flags);
+  static Try<process::Owned<Puller>> create(
+      const Flags& flags,
+      const process::Shared<uri::Fetcher>& fetcher);
 
   virtual ~Puller() {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
index d09e2ec..e08a959 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
@@ -14,17 +14,23 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <glog/logging.h>
+
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
+#include <process/http.hpp>
 
+#include <stout/os/exists.hpp>
 #include <stout/os/mkdir.hpp>
 #include <stout/os/rm.hpp>
+#include <stout/os/write.hpp>
 
 #include "common/command_utils.hpp"
 
+#include "uri/schemes/docker.hpp"
+
 #include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
 
 namespace http = process::http;
@@ -38,68 +44,77 @@ using process::Failure;
 using process::Future;
 using process::Owned;
 using process::Process;
-using process::Promise;
+using process::Shared;
+
+using process::defer;
+using process::dispatch;
+using process::spawn;
+using process::wait;
 
 namespace mesos {
 namespace internal {
 namespace slave {
 namespace docker {
 
-using RegistryClient = registry::RegistryClient;
-
-
 class RegistryPullerProcess : public Process<RegistryPullerProcess>
 {
 public:
-  static Try<Owned<RegistryPullerProcess>> create(const Flags& flags);
+  RegistryPullerProcess(
+      const string& _storeDir,
+      const http::URL& _defaultRegistryUrl,
+      const Shared<uri::Fetcher>& _fetcher);
 
-  process::Future<vector<string>> pull(
+  Future<vector<string>> pull(
       const spec::ImageReference& reference,
       const string& directory);
 
 private:
-  RegistryPullerProcess(
-      const Owned<RegistryClient>& registry,
-      const Duration& timeout);
-
-  Future<vector<string>> downloadLayers(
-      const spec::v2::ImageManifest& manifest,
+  Future<vector<string>> _pull(
       const spec::ImageReference& reference,
       const string& directory);
 
-  Future<Nothing> downloadLayer(
+  Future<Nothing> fetchLayer(
       const spec::ImageReference& reference,
       const string& directory,
       const string& blobSum,
-      const string& layerId);
+      const spec::v1::ImageManifest& v1);
 
-  Future<vector<string>> untarLayers(
+  Future<Nothing> _fetchLayer(
       const string& directory,
-      const vector<string>& layerIds);
-
-  Future<Nothing> untarLayer(
-      const string& directory,
-      const string& layerId);
-
-  Owned<RegistryClient> registryClient_;
-  const Duration pullTimeout_;
-  hashmap<string, Owned<Promise<Nothing>>> downloadTracker_;
+      const string& blobSum,
+      const spec::v1::ImageManifest& v1);
 
   RegistryPullerProcess(const RegistryPullerProcess&) = delete;
   RegistryPullerProcess& operator=(const RegistryPullerProcess&) = delete;
+
+  const string storeDir;
+
+  // If the user does not specify the registry url in the image
+  // reference, this registry url will be used as the default.
+  const http::URL defaultRegistryUrl;
+
+  Shared<uri::Fetcher> fetcher;
 };
 
 
-Try<Owned<Puller>> RegistryPuller::create(const Flags& flags)
+Try<Owned<Puller>> RegistryPuller::create(
+    const Flags& flags,
+    const Shared<uri::Fetcher>& fetcher)
 {
-  Try<Owned<RegistryPullerProcess>> process =
-    RegistryPullerProcess::create(flags);
-
-  if (process.isError()) {
-    return Error(process.error());
+  Try<http::URL> defaultRegistryUrl = http::URL::parse(flags.docker_registry);
+  if (defaultRegistryUrl.isError()) {
+    return Error(
+        "Failed to parse the default Docker registry: " +
+        defaultRegistryUrl.error());
   }
 
-  return Owned<Puller>(new RegistryPuller(process.get()));
+  Owned<RegistryPullerProcess> process(
+      new RegistryPullerProcess(
+          flags.docker_store_dir,
+          defaultRegistryUrl.get(),
+          fetcher));
+
+  return Owned<Puller>(new RegistryPuller(process));
 }
 
 
@@ -113,7 +128,7 @@ RegistryPuller::RegistryPuller(Owned<RegistryPullerProcess> _process)
 RegistryPuller::~RegistryPuller()
 {
   terminate(process.get());
-  process::wait(process.get());
+  wait(process.get());
 }
 
 
@@ -129,177 +144,159 @@ Future<vector<string>> RegistryPuller::pull(
 }
 
 
-Try<Owned<RegistryPullerProcess>> RegistryPullerProcess::create(
-    const Flags& flags)
-{
-  Result<double> timeoutSecs = numify<double>(flags.docker_puller_timeout_secs);
-  if ((timeoutSecs.isError()) || (timeoutSecs.get() <= 0)) {
-    return Error(
-        "Failed to create registry puller - invalid timeout value: " +
-        flags.docker_puller_timeout_secs);
-  }
-
-  Try<http::URL> registryUrl = http::URL::parse(flags.docker_registry);
-  if (registryUrl.isError()) {
-    return Error("Failed to parse Docker registry: " + registryUrl.error());
-  }
-
-  Try<http::URL> authServerUrl = http::URL::parse(flags.docker_auth_server);
-  if (authServerUrl.isError()) {
-    return Error("Failed to parse Docker auth server: " +
-                 authServerUrl.error());
-  }
-
-  Try<Owned<RegistryClient>> registry = RegistryClient::create(
-      registryUrl.get(), authServerUrl.get());
-
-  if (registry.isError()) {
-    return Error("Failed to create registry client: " + registry.error());
-  }
-
-  return Owned<RegistryPullerProcess>(new RegistryPullerProcess(
-      registry.get(),
-      Seconds(timeoutSecs.get())));
-}
-
-
 RegistryPullerProcess::RegistryPullerProcess(
-    const Owned<RegistryClient>& registry,
-    const Duration& timeout)
-  : registryClient_(registry),
-    pullTimeout_(timeout) {}
+    const string& _storeDir,
+    const http::URL& _defaultRegistryUrl,
+    const Shared<uri::Fetcher>& _fetcher)
+  : storeDir(_storeDir),
+    defaultRegistryUrl(_defaultRegistryUrl),
+    fetcher(_fetcher) {}
 
 
 Future<vector<string>> RegistryPullerProcess::pull(
     const spec::ImageReference& reference,
     const string& directory)
 {
-  // TODO(jojy): Have one outgoing manifest request per image.
-  return registryClient_->getManifest(reference)
-    .then(process::defer(self(), [=](const spec::v2::ImageManifest& manifest) {
-      return downloadLayers(manifest, reference, directory);
-    }))
-    .then(process::defer(self(), [=](const vector<string>& layerIds) {
-      return untarLayers(directory, layerIds);
-    }))
-    .after(pullTimeout_, [reference](Future<vector<string>> future) {
-      future.discard();
-      return Failure("Timed out");
-    });
+  // TODO(jieyu): Consider introducing a 'normalize' function to
+  // normalize 'reference' here. For instance, we need to add
+  // 'library/' prefix if the user does not specify a repository.
+  // Also consider merging the registry generation logic below into
+  // 'normalize'.
+
+  URI manifestUri;
+  if (reference.has_registry()) {
+    // TODO(jieyu): The user specified registry might contain port. We
+    // need to parse it and set the 'scheme' and 'port' accordingly.
+    manifestUri = uri::docker::manifest(
+        reference.repository(),
+        (reference.has_tag() ? reference.tag() : "latest"),
+        reference.registry());
+  } else {
+    const string registry = defaultRegistryUrl.domain.isSome()
+      ? defaultRegistryUrl.domain.get()
+      : stringify(defaultRegistryUrl.ip.get());
+
+    const Option<int> port = defaultRegistryUrl.port.isSome()
+      ? static_cast<int>(defaultRegistryUrl.port.get())
+      : Option<int>();
+
+    manifestUri = uri::docker::manifest(
+        reference.repository(),
+        (reference.has_tag() ? reference.tag() : "latest"),
+        registry,
+        defaultRegistryUrl.scheme,
+        port);
+  }
+
+  return fetcher->fetch(manifestUri, directory)
+    .then(defer(self(), &Self::_pull, reference, directory));
 }
 
 
-Future<vector<string>> RegistryPullerProcess::downloadLayers(
-    const spec::v2::ImageManifest& manifest,
+Future<vector<string>> RegistryPullerProcess::_pull(
     const spec::ImageReference& reference,
     const string& directory)
 {
+  Try<string> _manifest = os::read(path::join(directory, "manifest"));
+  if (_manifest.isError()) {
+    return Failure("Failed to read the manifest: " + _manifest.error());
+  }
+
+  Try<spec::v2::ImageManifest> manifest = spec::v2::parse(_manifest.get());
+  if (manifest.isError()) {
+    return Failure("Failed to parse the manifest: " + manifest.error());
+  }
+
   list<Future<Nothing>> futures;
   vector<string> layerIds;
 
-  CHECK_EQ(manifest.fslayers_size(), manifest.history_size());
+  CHECK_EQ(manifest->fslayers_size(), manifest->history_size());
 
-  for (int i = 0; i < manifest.fslayers_size(); i++) {
-    CHECK(manifest.history(i).has_v1());
+  for (int i = 0; i < manifest->fslayers_size(); i++) {
+    CHECK(manifest->history(i).has_v1());
 
-    layerIds.push_back(manifest.history(i).v1().id());
+    layerIds.push_back(manifest->history(i).v1().id());
 
-    futures.push_back(downloadLayer(
+    futures.push_back(fetchLayer(
         reference,
         directory,
-        manifest.fslayers(i).blobsum(),
-        manifest.history(i).v1().id()));
+        manifest->fslayers(i).blobsum(),
+        manifest->history(i).v1()));
   }
 
-  // TODO(jojy): Delete downloaded files in the directory on discard and
-  // failure?
-  // TODO(jojy): Iterate through the futures and log the failed future.
   return collect(futures)
     .then([layerIds]() { return layerIds; });
 }
 
 
-Future<Nothing> RegistryPullerProcess::downloadLayer(
+Future<Nothing> RegistryPullerProcess::fetchLayer(
     const spec::ImageReference& reference,
     const string& directory,
     const string& blobSum,
-    const string& layerId)
+    const spec::v1::ImageManifest& v1)
 {
-  VLOG(1) << "Downloading layer '"  << layerId
-          << "' for image '" << stringify(reference) << "'";
-
-  if (downloadTracker_.contains(layerId)) {
-    VLOG(1) << "Download already in progress for image '"
-            << stringify(reference) << "', layer '" << layerId << "'";
-
-    return downloadTracker_.at(layerId)->future();
+  // Check if the layer is in the store or not. If yes, skip the
+  // unnecessary fetching.
+  if (os::exists(paths::getImageLayerPath(storeDir, v1.id()))) {
+    return Nothing();
   }
 
-  Owned<Promise<Nothing>> downloadPromise(new Promise<Nothing>());
-
-  downloadTracker_.insert({layerId, downloadPromise});
-
-  const Path downloadFile(path::join(directory, layerId + ".tar"));
-
-  registryClient_->getBlob(
-      reference,
-      blobSum,
-      downloadFile)
-    .onAny(process::defer(
-        self(),
-        [this, layerId, downloadPromise, downloadFile](
-            const Future<size_t>& future) {
-          downloadTracker_.erase(layerId);
-
-          if (!future.isReady()) {
-              downloadPromise->fail(
-                  "Failed to download layer '" + layerId + "': " +
-                  (future.isFailed() ? future.failure() : "future discarded"));
-          } else if (future.get() == 0) {
-            // We don't expect Docker registry to return empty response
-            // even with empty layers.
-            downloadPromise->fail(
-                "Failed to download layer '" + layerId + "': no content");
-          } else {
-            downloadPromise->set(Nothing());
-          }
-        }));
-
-  return downloadPromise->future();
-}
-
-
-Future<vector<string>> RegistryPullerProcess::untarLayers(
-    const string& directory,
-    const vector<string>& layerIds)
-{
-  list<Future<Nothing>> futures;
-  foreach (const string& layerId, layerIds) {
-    VLOG(1) << "Untarring layer '" << layerId
-            << "' downloaded from registry to directory '"
-            << directory << "'";
-
-    futures.emplace_back(untarLayer(directory, layerId));
+  VLOG(1) << "Fetching layer '" << v1.id() << "' for image '"
+          << reference << "'";
+
+  URI blobUri;
+  if (reference.has_registry()) {
+    // TODO(jieyu): The user specified registry might contain port. We
+    // need to parse it and set the 'scheme' and 'port' accordingly.
+    blobUri = uri::docker::blob(
+        reference.repository(),
+        blobSum,
+        reference.registry());
+  } else {
+    const string registry = defaultRegistryUrl.domain.isSome()
+      ? defaultRegistryUrl.domain.get()
+      : stringify(defaultRegistryUrl.ip.get());
+
+    const Option<int> port = defaultRegistryUrl.port.isSome()
+      ? static_cast<int>(defaultRegistryUrl.port.get())
+      : Option<int>();
+
+    blobUri = uri::docker::blob(
+        reference.repository(),
+        blobSum,
+        registry,
+        defaultRegistryUrl.scheme,
+        port);
   }
 
-  return collect(futures)
-    .then([layerIds]() { return layerIds; });
+  return fetcher->fetch(blobUri, directory)
+    .then(defer(self(), &Self::_fetchLayer, directory, blobSum, v1));
 }
 
 
-Future<Nothing> RegistryPullerProcess::untarLayer(
+Future<Nothing> RegistryPullerProcess::_fetchLayer(
     const string& directory,
-    const string& layerId)
+    const string& blobSum,
+    const spec::v1::ImageManifest& v1)
 {
-  const string layerPath = path::join(directory, layerId);
-  const string tar = path::join(directory, layerId + ".tar");
+  const string layerPath = path::join(directory, v1.id());
+  const string tar = path::join(directory, blobSum);
   const string rootfs = paths::getImageLayerRootfsPath(layerPath);
+  const string manifest = paths::getImageLayerManifestPath(layerPath);
 
-  Try<Nothing> mkdir = os::mkdir(rootfs);
+  // NOTE: This will create 'layerPath' as well.
+  Try<Nothing> mkdir = os::mkdir(rootfs, true);
   if (mkdir.isError()) {
     return Failure(
-        "Failed to create directory '" + rootfs + "'"
-        ": " + mkdir.error());
+        "Failed to create rootfs directory '" + rootfs + "' "
+        "for layer '" + v1.id() + "': " + mkdir.error());
+  }
+
+  Try<Nothing> write = os::write(manifest, stringify(JSON::protobuf(v1)));
+  if (write.isError()) {
+    return Failure(
+        "Failed to save the layer manifest for layer '" +
+        v1.id() + "': " + write.error());
   }
 
   return command::untar(Path(tar), Path(rootfs))
@@ -308,8 +305,8 @@ Future<Nothing> RegistryPullerProcess::untarLayer(
       Try<Nothing> rm = os::rm(tar);
       if (rm.isError()) {
         return Failure(
-          "Failed to remove '" + tar + "' "
-          "after extraction: " + rm.error());
+            "Failed to remove '" + tar + "' "
+            "after extraction: " + rm.error());
       }
 
       return Nothing();

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
index c429df9..bbd6005 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
@@ -18,9 +18,12 @@
 #define __PROVISIONER_DOCKER_REGISTRY_PULLER_HPP__
 
 #include <process/owned.hpp>
+#include <process/shared.hpp>
 
 #include <stout/try.hpp>
 
+#include <mesos/uri/fetcher.hpp>
+
 #include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
 
 #include "slave/flags.hpp"
@@ -39,7 +42,9 @@ class RegistryPullerProcess;
 class RegistryPuller : public Puller
 {
 public:
-  static Try<process::Owned<Puller>> create(const Flags& flags);
+  static Try<process::Owned<Puller>> create(
+      const Flags& flags,
+      const process::Shared<uri::Fetcher>& fetcher);
 
   ~RegistryPuller();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/store.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
index d802a79..47d3dc1 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -34,6 +34,8 @@
 #include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
 
+#include "uri/fetcher.hpp"
+
 using namespace process;
 
 namespace spec = docker::spec;
@@ -89,7 +91,14 @@ private:
 
 Try<Owned<slave::Store>> Store::create(const Flags& flags)
 {
-  Try<Owned<Puller>> puller = Puller::create(flags);
+  // TODO(jieyu): We should inject URI fetcher from top level, instead
+  // of creating it here.
+  Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create();
+  if (fetcher.isError()) {
+    return Error("Failed to create the URI fetcher: " + fetcher.error());
+  }
+
+  Try<Owned<Puller>> puller = Puller::create(flags, fetcher->share());
   if (puller.isError()) {
     return Error("Failed to create Docker puller: " + puller.error());
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 855812e..1c6a87b 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -124,16 +124,6 @@ mesos::internal::slave::Flags::Flags()
       "Directory the appc provisioner will store images in.\n",
       "/tmp/mesos/store/appc");
 
-  add(&Flags::docker_auth_server,
-      "docker_auth_server",
-      "Docker authentication server used to authenticate with Docker registry",
-      "https://auth.docker.io");
-
-  add(&Flags::docker_puller_timeout_secs,
-      "docker_puller_timeout",
-      "Timeout in seconds for pulling images from the Docker registry",
-      "60");
-
   add(&Flags::docker_registry,
       "docker_registry",
       "The default url for pulling Docker images. It could either be a Docker\n"

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 54c1a69..c079321 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -53,8 +53,6 @@ public:
   std::string appc_simple_discovery_uri_prefix;
   std::string appc_store_dir;
 
-  std::string docker_auth_server;
-  std::string docker_puller_timeout_secs;
   std::string docker_registry;
   std::string docker_store_dir;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index a18085e..4eb1faf 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -465,12 +465,10 @@ string Slave::Http::STATE_HELP() {
         "         \"work_dir\" : \"/tmp/mesos\",",
         "         \"launcher_dir\" : \"/path/to/mesos/build/src\",",
         "         \"registration_backoff_factor\" : \"1secs\",",
-        "         \"docker_auth_server\" : \"https://auth.docker.io\",",
         "         \"oversubscribed_resources_interval\" : \"15secs\",",
         "         \"enforce_container_disk_quota\" : \"false\",",
         "         \"container_disk_watch_interval\" : \"15secs\",",
         "         \"disk_watch_interval\" : \"1mins\",",
-        "         \"docker_puller_timeout\" : \"60\",",
         "         \"cgroups_limit_swap\" : \"false\",",
         "         \"hostname_lookup\" : \"true\",",
         "         \"perf_duration\" : \"10secs\",",

http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/tests/containerizer/provisioner_docker_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/provisioner_docker_tests.cpp b/src/tests/containerizer/provisioner_docker_tests.cpp
index c6b1e69..b3c6f88 100644
--- a/src/tests/containerizer/provisioner_docker_tests.cpp
+++ b/src/tests/containerizer/provisioner_docker_tests.cpp
@@ -14,984 +14,50 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <utility>
-
 #include <gmock/gmock.h>
 
-#include <gtest/gtest.h>
-
-#include <stout/duration.hpp>
-
 #include <stout/gtest.hpp>
 #include <stout/json.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
 #include <stout/stringify.hpp>
 
-#include <process/address.hpp>
-#include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
-#include <process/io.hpp>
 #include <process/owned.hpp>
-#include <process/socket.hpp>
-#include <process/subprocess.hpp>
-
-#include <process/ssl/gtest.hpp>
 
 #include <mesos/docker/spec.hpp>
 
 #include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
 #include "slave/containerizer/mesos/provisioner/docker/store.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/token_manager.hpp"
 
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
-namespace io = process::io;
+namespace paths = mesos::internal::slave::docker::paths;
 namespace slave = mesos::internal::slave;
 namespace spec = ::docker::spec;
 
-using std::list;
-using std::map;
-using std::pair;
 using std::string;
 using std::vector;
 
-using process::Clock;
 using process::Future;
 using process::Owned;
 using process::Promise;
-using process::Subprocess;
 
-using process::network::Socket;
+using slave::ImageInfo;
 
 using slave::docker::Puller;
 using slave::docker::RegistryPuller;
-
-using slave::docker::paths::getImageLayerRootfsPath;
-
-using slave::docker::registry::RegistryClient;
-using slave::docker::registry::Token;
-using slave::docker::registry::TokenManager;
+using slave::docker::Store;
 
 namespace mesos {
 namespace internal {
 namespace tests {
 
-/**
- * Provides token operations and defaults.
- */
-class TokenHelper {
-protected:
-  const string hdrBase64 = base64::encode(
-    "{ \
-      \"alg\":\"ES256\", \
-      \"typ\":\"JWT\", \
-      \"x5c\":[\"test\"] \
-    }");
-
-  string getClaimsBase64() const
-  {
-    return base64::encode(claimsJsonString);
-  }
-
-  string getTokenString() const
-  {
-    return  hdrBase64 + "." + getClaimsBase64() + "." + signBase64;
-  }
-
-  string getDefaultTokenString()
-  {
-    // Construct response and send(server side).
-    const double expirySecs = Clock::now().secs() + Days(365).secs();
-
-    claimsJsonString =
-      "{\"access\" \
-        :[ \
-        { \
-          \"type\":\"repository\", \
-            \"name\":\"library/busybox\", \
-            \"actions\":[\"pull\"]}], \
-            \"aud\":\"registry.docker.io\", \
-            \"exp\":" + stringify(expirySecs) + ", \
-            \"iat\":1438887168, \
-            \"iss\":\"auth.docker.io\", \
-            \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-            \"nbf\":1438887166, \
-            \"sub\":\"\" \
-        }";
-
-    return getTokenString();
-  }
-
-  const string signBase64 = base64::encode("{\"\"}");
-  string claimsJsonString;
-};
-
-
-/**
- * Fixture for testing TokenManager component.
- */
-class RegistryTokenTest : public TokenHelper, public ::testing::Test {};
-
-
-// Tests JSON Web Token parsing for a valid token string.
-TEST_F(RegistryTokenTest, ValidToken)
-{
-  const double expirySecs = Clock::now().secs() + Days(365).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887168, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-         }";
-
-  Try<Token> token = Token::create(getTokenString());
-
-  ASSERT_SOME(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with expiration
-// date in the past.
-TEST_F(RegistryTokenTest, ExpiredToken)
-{
-  // Use an arbitrary fixed date that is far in the past (12 weeks
-  // after the Unix epoch).
-  const double expirySecs = Weeks(12).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887166, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-         }";
-
-  Try<Token> token = Token::create(getTokenString());
-
-  EXPECT_ERROR(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with no expiration date.
-TEST_F(RegistryTokenTest, NoExpiration)
-{
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"iat\":1438887166, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-      }";
-
-  const Try<Token> token = Token::create(getTokenString());
-
-  ASSERT_SOME(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with not-before date in the
-// future.
-TEST_F(RegistryTokenTest, NotBeforeInFuture)
-{
-  const double expirySecs = Clock::now().secs() + Days(365).secs();
-  const double nbfSecs = Clock::now().secs() + Days(7).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887166, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":" + stringify(nbfSecs) + ", \
-          \"sub\":\"\" \
-         }";
-
-  const Try<Token> token = Token::create(getTokenString());
-
-  ASSERT_SOME(token);
-  ASSERT_EQ(token.get().isValid(), false);
-}
-
-
-#ifdef USE_SSL_SOCKET
-
-// Test suite for docker registry tests.
-class RegistryClientTest : public virtual SSLTest, public TokenHelper
-{
-protected:
-  RegistryClientTest() {}
-
-  Try<Socket> getServer() {
-    return setup_server({
-        {"SSL_ENABLED", "true"},
-        {"SSL_KEY_FILE", key_path().value},
-        {"SSL_CERT_FILE", certificate_path().value}});
-  }
-};
-
-
-// Tests TokenManager for a simple token request.
-TEST_F(RegistryClientTest, SimpleGetToken)
-{
-  Try<Socket> server = getServer();
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  // Create URL from server hostname and port.
-  const process::http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
-  ASSERT_SOME(tokenMgr);
-
-  Future<Token> token =
-    tokenMgr.get()->getToken(
-        "registry.docker.io",
-        "repository:library/busybox:pull",
-        None());
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Construct response and send(server side).
-  const double expirySecs = Clock::now().secs() + Days(365).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887168, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-         }";
-
-  const string tokenString(getTokenString());
-  const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
-
-  const string buffer =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
-
-  AWAIT_ASSERT_READY(token);
-  ASSERT_EQ(token.get().raw, tokenString);
-}
-
-
-// Tests TokenManager for bad token response from server.
-TEST_F(RegistryClientTest, BadTokenResponse)
-{
-  Try<Socket> server = getServer();
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  // Create URL from server hostname and port.
-  const process::http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
-  ASSERT_SOME(tokenMgr);
-
-  Future<Token> token =
-    tokenMgr.get()->getToken(
-        "registry.docker.io",
-        "repository:library/busybox:pull",
-        None());
-
-  AWAIT_ASSERT_READY(socket);
-
-  const string tokenString("bad token");
-  const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
-
-  const string buffer =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
-
-  AWAIT_FAILED(token);
-}
-
-
-// Tests TokenManager for request to invalid server.
-TEST_F(RegistryClientTest, DISABLED_BadTokenServerAddress)
-{
-  // Create an invalid URL with current time.
-  const process::http::URL url("https", stringify(Clock::now().secs()), 0);
-
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
-  ASSERT_SOME(tokenMgr);
-
-  Future<Token> token =
-    tokenMgr.get()->getToken(
-        "registry.docker.io",
-        "repository:library/busybox:pull",
-        None());
-
-  AWAIT_FAILED(token);
-}
-
-
-// Tests docker registry's getManifest API.
-TEST_F(RegistryClientTest, SimpleGetManifest)
-{
-  Try<Socket> server = getServer();
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  const process::http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<RegistryClient>> registryClient =
-    RegistryClient::create(url, url, None());
-
-  ASSERT_SOME(registryClient);
-
-  Try<spec::ImageReference> reference =
-    spec::parseImageReference("library/busybox");
-  ASSERT_SOME(reference);
-
-  Future<spec::v2::ImageManifest> manifestResponse =
-    registryClient.get()->getManifest(reference.get());
-
-  const string unauthResponseHeaders = "Www-Authenticate: Bearer"
-    " realm=\"https://auth.docker.io/token\","
-    "service=" + stringify(server.get().address().get()) + ","
-    "scope=\"repository:library/busybox:pull\"";
-
-  const string unauthHttpResponse =
-    string("HTTP/1.1 401 Unauthorized\r\n") +
-    unauthResponseHeaders + "\r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Send 401 Unauthorized response for a manifest request.
-  Future<string> manifestHttpRequest = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(manifestHttpRequest);
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
-  // Token response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  Future<string> tokenRequest = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(tokenRequest);
-
-  const string tokenResponse =
-    "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
-  const string tokenHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
-  // Manifest response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  manifestHttpRequest = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(manifestHttpRequest);
-
-  const string manifestJSON =
-    "{"
-    "   \"schemaVersion\": 1,"
-    "   \"name\": \"library/busybox\","
-    "   \"tag\": \"latest\","
-    "   \"architecture\": \"amd64\","
-    "   \"fsLayers\": ["
-    "      {"
-    "         \"blobSum\": "
-  "\"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\""
-    "      },"
-    "      {"
-    "         \"blobSum\": "
-  "\"sha256:1db09adb5ddd7f1a07b6d585a7db747a51c7bd17418d47e91f901bdf420abd66\""
-    "      },"
-    "      {"
-    "         \"blobSum\": "
-  "\"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\""
-    "      }"
-    "   ],"
-    "   \"history\": ["
-    "      {"
-    "         \"v1Compatibility\": "
-    "           \"{"
-    "             \\\"id\\\": "
-    "\\\"1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
-    "             \\\"parent\\\": "
-    "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
-    "           }\""
-    "      },"
-    "      {"
-    "         \"v1Compatibility\": "
-    "           \"{"
-    "             \\\"id\\\": "
-    "\\\"2ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
-    "             \\\"parent\\\": "
-    "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
-    "           }\""
-    "      },"
-    "      {"
-    "         \"v1Compatibility\": "
-    "           \"{"
-    "             \\\"id\\\": "
-    "\\\"3ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
-    "             \\\"parent\\\": "
-    "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
-    "           }\""
-    "      }"
-    "   ],"
-    "   \"signatures\": ["
-    "      {"
-    "         \"header\": {"
-    "            \"jwk\": {"
-    "               \"crv\": \"P-256\","
-    "               \"kid\": "
-    "\"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\","
-    "               \"kty\": \"EC\","
-    "               \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\","
-    "               \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\""
-    "            },"
-    "            \"alg\": \"ES256\""
-    "         },"
-    "         \"signature\": \"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgO"
-    "EarAs7_4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\","
-    "         \"protected\": \"eyJmb3JtYXRMZW5ndGgiOjYwNjMsImZvcm1hdFRhaWwiOiJ"
-    "DbjAiLCJ0aW1lIjoiMjAxNC0wOS0xMVQxNzoxNDozMFoifQ\""
-    "      }"
-    "   ]"
-    "}";
-
-  const string manifestHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(manifestJSON.length()) + "\r\n" +
-    "Docker-Content-Digest: "
-    "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
-    + "\r\n" +
-    "\r\n" +
-    manifestJSON;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
-
-  AWAIT_ASSERT_READY(manifestResponse);
-
-  EXPECT_EQ(
-      manifestResponse.get().history(2).v1().id(),
-      "1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-
-  EXPECT_EQ(
-      manifestResponse.get().history(1).v1().id(),
-      "2ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-
-  EXPECT_EQ(
-      manifestResponse.get().history(0).v1().id(),
-      "3ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-}
-
-
-// Tests docker registry's getBlob API.
-TEST_F(RegistryClientTest, SimpleGetBlob)
-{
-  Try<Socket> server = getServer();
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  Try<Socket> blobServer = getServer();
-
-  ASSERT_SOME(blobServer);
-  ASSERT_SOME(blobServer.get().address());
-  ASSERT_SOME(blobServer.get().address().get().hostname());
-
-  Future<Socket> blobServerAcceptSocket = blobServer.get().accept();
-
-  const process::http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<RegistryClient>> registryClient =
-    RegistryClient::create(url, url, None());
-
-  ASSERT_SOME(registryClient);
-
-  const Path blobPath(path::join(os::getcwd(), "blob"));
-
-  Try<spec::ImageReference> reference = spec::parseImageReference("blob");
-  ASSERT_SOME(reference);
-
-  Future<size_t> result =
-    registryClient.get()->getBlob(
-        reference.get(),
-        "digest",
-        blobPath);
-
-  const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
-    " realm=\"https://auth.docker.io/token\","
-    "service=" + stringify(server.get().address().get()) + ","
-    "scope=\"repository:library/busybox:pull\"";
-
-  const string unauthHttpResponse =
-    string("HTTP/1.1 401 Unauthorized\r\n") +
-    unauthResponseHeaders + "\r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Send 401 Unauthorized response.
-  Future<string> blobHttpRequest = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(blobHttpRequest);
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
-  // Send token response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  Future<string> tokenRequest = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(tokenRequest);
-
-  const string tokenResponse =
-    "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
-  const string tokenHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
-  // Send redirect.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  blobHttpRequest = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(blobHttpRequest);
-
-  const string redirectHttpResponse =
-    string("HTTP/1.1 307 Temporary Redirect\r\n") +
-    "Location: https://" +
-    blobServer.get().address().get().hostname().get() + ":" +
-    stringify(blobServer.get().address().get().port) + "/blob \r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
-
-  // Finally send blob response.
-  AWAIT_ASSERT_READY(blobServerAcceptSocket);
-
-  blobHttpRequest = Socket(blobServerAcceptSocket.get()).recv();
-  AWAIT_ASSERT_READY(blobHttpRequest);
-
-  const string blobResponse = stringify(Clock::now());
-
-  const string blobHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(blobResponse.length()) + "\r\n" +
-    "\r\n" +
-    blobResponse;
-
-  AWAIT_ASSERT_READY(Socket(blobServerAcceptSocket.get()).send(
-      blobHttpResponse));
-
-  AWAIT_ASSERT_READY(result);
-
-  Try<string> blob = os::read(blobPath);
-  ASSERT_SOME(blob);
-  ASSERT_EQ(blob.get(), blobResponse);
-}
-
-
-TEST_F(RegistryClientTest, BadRequest)
-{
-  Try<Socket> server = getServer();
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  const process::http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<RegistryClient>> registryClient =
-    RegistryClient::create(url, url, None());
-
-  ASSERT_SOME(registryClient);
-
-  const Path blobPath(path::join(os::getcwd(), "blob"));
-
-  Try<spec::ImageReference> reference = spec::parseImageReference("blob");
-  ASSERT_SOME(reference);
-
-  Future<size_t> result =
-    registryClient.get()->getBlob(
-        reference.get(),
-        "digest",
-        blobPath);
-
-  const string badRequestResponse =
-    "{\"errors\": [{\"message\": \"Error1\" }, {\"message\": \"Error2\"}]}";
-
-  const string badRequestHttpResponse =
-    string("HTTP/1.1 400 Bad Request\r\n") +
-    "Content-Length : " + stringify(badRequestResponse.length()) + "\r\n" +
-    "\r\n" +
-    badRequestResponse;
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Send 400 Bad Request.
-  Future<string> blobHttpRequest = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(blobHttpRequest);
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(badRequestHttpResponse));
-
-  AWAIT_FAILED(result);
-
-  ASSERT_TRUE(strings::contains(result.failure(), "Error1"));
-  ASSERT_TRUE(strings::contains(result.failure(), "Error2"));
-}
-
-
-// Tests docker RegistryPuller component. It simulates pulling an image layer
-// from remote registry and then verifies the content saved on disk.
-TEST_F(RegistryClientTest, SimpleRegistryPuller)
-{
-  Try<Socket> server = getServer();
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  Try<Socket> blobServer = getServer();
-
-  ASSERT_SOME(blobServer);
-  ASSERT_SOME(blobServer.get().address());
-  ASSERT_SOME(blobServer.get().address().get().hostname());
-
-  Future<Socket> blobServerAcceptSock = blobServer.get().accept();
-
-  slave::Flags flags;
-  process::network::Address address = server.get().address().get();
-  const string url = "https://" + address.hostname().get() + ":" +
-                     stringify(address.port);
-  flags.docker_registry = url;
-  flags.docker_auth_server = url;
-
-  Try<Owned<Puller>> registryPuller = RegistryPuller::create(flags);
-  ASSERT_SOME(registryPuller);
-
-  const string registryPullerPath = os::getcwd();
-
-  Try<spec::ImageReference> reference = spec::parseImageReference("busybox");
-  ASSERT_SOME(reference);
-
-  Future<vector<string>> registryPullerFuture =
-    registryPuller.get()->pull(reference.get(), registryPullerPath);
-
-  const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
-    " realm=\"https://auth.docker.io/token\","
-    "service=" + stringify(server.get().address().get()) + ","
-    "scope=\"repository:library/busybox:pull\"";
-
-  const string unauthHttpResponse =
-    string("HTTP/1.1 401 Unauthorized\r\n") +
-    unauthResponseHeaders + "\r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Send 401 Unauthorized response for a manifest request.
-  Future<string> registryPullerHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
-  // Token response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(tokenRequestFuture);
-
-  const string tokenResponse =
-    "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
-  const string tokenHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
-  // Manifest response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  registryPullerHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
-
-  const string manifestResponse =
-    "{"
-    "   \"schemaVersion\": 1,"
-    "   \"name\": \"library/busybox\","
-    "   \"tag\": \"latest\","
-    "   \"architecture\": \"amd64\","
-    "   \"fsLayers\": ["
-    "      {"
-    "         \"blobSum\": "
-  "\"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\""
-    "      }"
-    "   ],"
-    "   \"history\": ["
-    "      {"
-    "         \"v1Compatibility\": "
-    "           \"{"
-    "             \\\"id\\\": "
-    "\\\"1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
-    "             \\\"parent\\\": "
-    "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
-    "           }\""
-    "      }"
-    "   ],"
-    "   \"signatures\": ["
-    "      {"
-    "         \"header\": {"
-    "            \"jwk\": {"
-    "               \"crv\": \"P-256\","
-    "               \"kid\": "
-    "\"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\","
-    "               \"kty\": \"EC\","
-    "               \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\","
-    "               \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\""
-    "            },"
-    "            \"alg\": \"ES256\""
-    "         },"
-    "         \"signature\": \"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgO"
-    "EarAs7_4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\","
-    "         \"protected\": \"eyJmb3JtYXRMZW5ndGgiOjYwNjMsImZvcm1hdFRhaWwiOiJ"
-    "DbjAiLCJ0aW1lIjoiMjAxNC0wOS0xMVQxNzoxNDozMFoifQ\""
-    "      }"
-    "   ]"
-    "}";
-
-  const string manifestHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(manifestResponse.length()) + "\r\n" +
-    "Docker-Content-Digest: "
-    "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
-    + "\r\n" +
-    "\r\n" +
-    manifestResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
-
-  // Redirect response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  registryPullerHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
-
-  const string redirectHttpResponse =
-    string("HTTP/1.1 307 Temporary Redirect\r\n") +
-    "Content-Length : 0\r\n" +
-    "Location: https://" +
-    blobServer.get().address().get().hostname().get() + ":" +
-    stringify(blobServer.get().address().get().port) + "/blob \r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
-
-  AWAIT_ASSERT_READY(blobServerAcceptSock);
-
-  registryPullerHttpRequestFuture = Socket(blobServerAcceptSock.get()).recv();
-  AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
-
-  // Prepare the blob response from the server. The blob response buffer is a
-  // tarball. So we create a tarball of our test response and send that.
-  const string blobFile = "blob";
-  const string blobResponse = "hello docker";
-
-  Path blobPath(path::join(registryPullerPath, blobFile));
-  ASSERT_SOME(os::write(blobPath, blobResponse));
-
-  Path blobTarPath(path::join(registryPullerPath, blobFile + ".tar"));
-
-  vector<string> argv = {
-    "tar",
-    "-C",
-    registryPullerPath,
-    "-c",
-    "-f",
-    blobTarPath,
-    blobFile
-  };
-
-  Try<Subprocess> s = subprocess(
-      "tar",
-      argv,
-      Subprocess::PATH("/dev/null"),
-      Subprocess::PATH("/dev/null"),
-      Subprocess::PATH("/dev/null"));
-
-  ASSERT_SOME(s);
-  AWAIT_ASSERT_READY(s.get().status());
-
-  Try<Bytes> tarSize = os::stat::size(blobTarPath);
-  ASSERT_SOME(tarSize);
-
-  ASSERT_SOME(os::rm(blobPath));
-
-  std::unique_ptr<char[]> tarBuffer(new char[tarSize.get().bytes()]);
-  ASSERT_NE(tarBuffer.get(), nullptr);
-
-  Try<int> fd = os::open(
-      blobTarPath,
-      O_RDONLY,
-      S_IRUSR | S_IRGRP | S_IROTH);
-  ASSERT_SOME(fd);
-
-  ASSERT_SOME(os::nonblock(fd.get()));
-
-  AWAIT_ASSERT_READY(
-      io::read(fd.get(), tarBuffer.get(), tarSize.get().bytes()));
-
-  ASSERT_SOME(os::close(fd.get()));
-
-  const string blobHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-type : application/octet-stream\r\n" +
-    "Content-Length : " +
-    stringify(tarSize.get().bytes()) + "\r\n" +
-    "\r\n";
-
-  const size_t blobResponseSize =
-    blobHttpResponse.length() + tarSize.get().bytes();
-
-  std::unique_ptr<char[]> responseBuffer(new char[blobResponseSize]);
-  ASSERT_NE(responseBuffer.get(), nullptr);
-
-  memcpy(
-      responseBuffer.get(),
-      blobHttpResponse.c_str(),
-      blobHttpResponse.length());
-
-  memcpy(
-      responseBuffer.get() + blobHttpResponse.length(),
-      tarBuffer.get(),
-      tarSize.get().bytes());
-
-  AWAIT_ASSERT_READY(Socket(blobServerAcceptSock.get()).send(
-      responseBuffer.get(),
-      blobResponseSize));
-
-  AWAIT_ASSERT_READY(registryPullerFuture);
-  vector<string> layers = registryPullerFuture.get();
-  ASSERT_EQ(1u, layers.size());
-  ASSERT_EQ(layers.front(),
-            "1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-
-  Try<string> blob = os::read(path::join(
-      registryPullerPath,
-      layers.front(),
-      "rootfs",
-      blobFile));
-
-  ASSERT_SOME(blob);
-  ASSERT_EQ(blob.get(), blobResponse);
-}
-
-#endif // USE_SSL_SOCKET
-
-
 class ProvisionerDockerLocalStoreTest : public TemporaryDirectoryTest
 {
 public:
@@ -999,20 +65,22 @@ public:
       const slave::Flags& flags,
       const vector<string>& layers)
   {
-    const string layersPath = path::join(flags.docker_store_dir, "layers");
-
     // Verify contents of the image in store directory.
-    const string layerPath1 =
-      getImageLayerRootfsPath(flags.docker_store_dir, "123");
+    const string layerPath1 = paths::getImageLayerRootfsPath(
+        flags.docker_store_dir,
+        "123");
 
-    const string layerPath2 =
-      getImageLayerRootfsPath(flags.docker_store_dir, "456");
+    const string layerPath2 = paths::getImageLayerRootfsPath(
+        flags.docker_store_dir,
+        "456");
 
     EXPECT_TRUE(os::exists(layerPath1));
     EXPECT_TRUE(os::exists(layerPath2));
+
     EXPECT_SOME_EQ(
         "foo 123",
         os::read(path::join(layerPath1 , "temp")));
+
     EXPECT_SOME_EQ(
         "bar 456",
         os::read(path::join(layerPath2, "temp")));
@@ -1238,6 +306,30 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
   EXPECT_EQ(imageInfo1.get().layers, imageInfo2.get().layers);
 }
 
+
+class ProvisionerDockerRegistryPullerTest : public TemporaryDirectoryTest {};
+
+
+TEST_F(ProvisionerDockerRegistryPullerTest, INTERNET_CURL_Pull)
+{
+  slave::Flags flags;
+  flags.docker_registry = "https://registry-1.docker.io";
+  flags.docker_store_dir = os::getcwd();
+
+  Try<Owned<slave::Store>> store = Store::create(flags);
+  ASSERT_SOME(store);
+
+  Image image;
+  image.set_type(Image::DOCKER);
+  image.mutable_docker()->set_name("library/alpine");
+
+  Future<ImageInfo> imageInfo = store.get()->get(image);
+  AWAIT_READY_FOR(imageInfo, Seconds(60));
+
+  EXPECT_LE(1u, imageInfo->layers.size());
+  EXPECT_SOME(imageInfo->dockerManifest);
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {