You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gi...@apache.org on 2019/04/05 05:56:01 UTC

[mesos] 04/08: Supported docker manifest v2 schema2.

This is an automated email from the ASF dual-hosted git repository.

gilbert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 9f040066eaf0c6b742b0d86384fd71aae2036f26
Author: Gilbert Song <so...@gmail.com>
AuthorDate: Fri Mar 22 00:32:30 2019 -0700

    Supported docker manifest v2 schema2.
    
    Review: https://reviews.apache.org/r/70288
---
 .../mesos/provisioner/docker/registry_puller.cpp   | 266 +++++++++++++++---
 .../mesos/provisioner/docker/store.cpp             |  52 +++-
 src/uri/fetchers/docker.cpp                        | 296 ++++++++-------------
 3 files changed, 392 insertions(+), 222 deletions(-)

diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
index 7778976..35b6afb 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
@@ -25,6 +25,7 @@
 
 #include <stout/os/exists.hpp>
 #include <stout/os/mkdir.hpp>
+#include <stout/os/rename.hpp>
 #include <stout/os/rm.hpp>
 #include <stout/os/write.hpp>
 
@@ -92,13 +93,34 @@ private:
     const hashset<string>& blobSums,
     const string& backend);
 
-  Future<hashset<string>> fetchBlobs(
+  Future<Image> ____pull(
     const spec::ImageReference& reference,
     const string& directory,
+    const spec::v2_2::ImageManifest& manifest,
+    const hashset<string>& digests,
+    const string& backend);
+
+  Future<hashset<string>> fetchBlobs(
+    const spec::ImageReference& normalizedRef,
+    const string& directory,
     const spec::v2::ImageManifest& manifest,
     const string& backend,
     const Option<Secret::Value>& config);
 
+  Future<hashset<string>> fetchBlobs(
+      const spec::ImageReference& normalizedRef,
+      const string& directory,
+      const spec::v2_2::ImageManifest& manifest,
+      const string& backend,
+      const Option<Secret::Value>& config);
+
+  Future<hashset<string>> fetchBlobs(
+      const spec::ImageReference& normalizedRef,
+      const string& directory,
+      const hashset<string>& digests,
+      const string& backend,
+      const Option<Secret::Value>& config);
+
   RegistryPullerProcess(const RegistryPullerProcess&) = delete;
   RegistryPullerProcess& operator=(const RegistryPullerProcess&) = delete;
 
@@ -237,31 +259,31 @@ Future<Image> RegistryPullerProcess::pull(
 
 
 Future<Image> RegistryPullerProcess::_pull(
-    const spec::ImageReference& _reference,
+    const spec::ImageReference& reference,
     const string& directory,
     const string& backend,
     const Option<Secret::Value>& config)
 {
-  spec::ImageReference reference = normalize(_reference, defaultRegistryUrl);
+  spec::ImageReference normalizedRef = normalize(reference, defaultRegistryUrl);
 
   URI manifestUri;
-  if (reference.has_registry()) {
-    Result<int> port = spec::getRegistryPort(reference.registry());
+  if (normalizedRef.has_registry()) {
+    Result<int> port = spec::getRegistryPort(normalizedRef.registry());
     if (port.isError()) {
       return Failure("Failed to get registry port: " + port.error());
     }
 
-    Try<string> scheme = spec::getRegistryScheme(reference.registry());
+    Try<string> scheme = spec::getRegistryScheme(normalizedRef.registry());
     if (scheme.isError()) {
       return Failure("Failed to get registry scheme: " + scheme.error());
     }
 
     manifestUri = uri::docker::manifest(
-        reference.repository(),
-        (reference.has_digest()
-          ? reference.digest()
-          : (reference.has_tag() ? reference.tag() : "latest")),
-        spec::getRegistryHost(reference.registry()),
+        normalizedRef.repository(),
+        (normalizedRef.has_digest()
+          ? normalizedRef.digest()
+          : (normalizedRef.has_tag() ? normalizedRef.tag() : "latest")),
+        spec::getRegistryHost(normalizedRef.registry()),
         scheme.get(),
         port.isSome() ? port.get() : Option<int>());
   } else {
@@ -274,19 +296,24 @@ Future<Image> RegistryPullerProcess::_pull(
       : Option<int>();
 
     manifestUri = uri::docker::manifest(
-        reference.repository(),
-        (reference.has_digest()
-          ? reference.digest()
-          : (reference.has_tag() ? reference.tag() : "latest")),
+        normalizedRef.repository(),
+        (normalizedRef.has_digest()
+          ? normalizedRef.digest()
+          : (normalizedRef.has_tag() ? normalizedRef.tag() : "latest")),
         registry,
         defaultRegistryUrl.scheme,
         port);
   }
 
-  VLOG(1) << "Pulling image '" << reference
+  VLOG(1) << "Pulling image '" << normalizedRef
           << "' from '" << manifestUri
           << "' to '" << directory << "'";
 
+  // Pass the original 'reference' along to subsequent methods
+  // because metadata manager may already has this reference in
+  // cache. This is necessary to ensure the backward compatibility
+  // after upgrading to the version including MESOS-9675 for
+  // docker manifest v2 schema2 support.
   return fetcher->fetch(
       manifestUri,
       directory,
@@ -306,21 +333,57 @@ Future<Image> RegistryPullerProcess::__pull(
     return Failure("Failed to read the manifest: " + _manifest.error());
   }
 
-  Try<spec::v2::ImageManifest> manifest = spec::v2::parse(_manifest.get());
+  VLOG(1) << "The manifest for image '" << reference << "' is '"
+          << _manifest.get() << "'";
+
+  // To ensure backward compatibility in upgrade case for docker
+  // manifest v2 schema2 support, it is unavoidable to call
+  // 'normalize()' twice because some existing image may already
+  // be cached by metadata manager before upgrade and now metadata
+  // persists the cache from the image information constructed at
+  // registry puller. Please see MESOS-9675 for details.
+  spec::ImageReference normalizedRef = normalize(reference, defaultRegistryUrl);
+
+  Try<JSON::Object> json = JSON::parse<JSON::Object>(_manifest.get());
+  if (json.isError()) {
+    return Failure("Failed to parse the manifest JSON: " + json.error());
+  }
+
+  Result<JSON::Number> schemaVersion = json->at<JSON::Number>("schemaVersion");
+  if (schemaVersion.isError()) {
+    return Failure(
+        "Failed to find manifest schema version: " + schemaVersion.error());
+  }
+
+  if (schemaVersion.isSome() && schemaVersion->as<int>() == 2) {
+    Try<spec::v2_2::ImageManifest> manifest = spec::v2_2::parse(json.get());
+    if (manifest.isError()) {
+      return Failure("Failed to parse the manifest: " + manifest.error());
+    }
+
+    return fetchBlobs(normalizedRef, directory, manifest.get(), backend, config)
+      .then(defer(self(),
+                  &Self::____pull,
+                  reference,
+                  directory,
+                  manifest.get(),
+                  lambda::_1,
+                  backend));
+  }
+
+  // By default treat the manifest format as schema 1.
+  Try<spec::v2::ImageManifest> manifest = spec::v2::parse(json.get());
   if (manifest.isError()) {
     return Failure("Failed to parse the manifest: " + manifest.error());
   }
 
-  VLOG(1) << "The manifest for image '" << reference << "' is '"
-          << _manifest.get() << "'";
-
   // NOTE: This can be a CHECK (i.e., shouldn't happen). However, in
   // case docker has bugs, we return a Failure instead.
   if (manifest->fslayers_size() != manifest->history_size()) {
     return Failure("'fsLayers' and 'history' have different size in manifest");
   }
 
-  return fetchBlobs(reference, directory, manifest.get(), backend, config)
+  return fetchBlobs(normalizedRef, directory, manifest.get(), backend, config)
     .then(defer(self(),
                 &Self::___pull,
                 reference,
@@ -437,9 +500,94 @@ Future<Image> RegistryPullerProcess::___pull(
 }
 
 
-Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
+Future<Image> RegistryPullerProcess::____pull(
     const spec::ImageReference& reference,
     const string& directory,
+    const spec::v2_2::ImageManifest& manifest,
+    const hashset<string>& digests,
+    const string& backend)
+{
+  hashset<string> uniqueIds;
+  vector<string> layerIds;
+  vector<Future<Nothing>> futures;
+
+  for (int i = 0; i < manifest.layers_size(); i++) {
+    const string& digest = manifest.layers(i).digest();
+    if (uniqueIds.contains(digest)) {
+      continue;
+    }
+
+    layerIds.push_back(digest);
+    uniqueIds.insert(digest);
+
+    // Skip if the layer is already in the store.
+    if (os::exists(paths::getImageLayerRootfsPath(storeDir, digest, backend))) {
+      continue;
+    }
+
+    const string layerPath = path::join(directory, digest);
+    const string originalTar = path::join(directory, digest);
+    const string tar = path::join(directory, digest + "-archive");
+    const string rootfs = paths::getImageLayerRootfsPath(layerPath, backend);
+
+    VLOG(1) << "Moving layer tar ball '" << originalTar
+            << "' to '" << tar << "'";
+
+    // Move layer tar ball to use its name for the extracted layer directory.
+    Try<Nothing> rename = os::rename(originalTar, tar);
+    if (rename.isError()) {
+      return Failure(
+          "Failed to move the layer tar ball from '" + originalTar +
+          "' to '" + tar + "': " + rename.error());
+    }
+
+    VLOG(1) << "Extracting layer tar ball '" << tar
+            << "' to rootfs '" << rootfs << "'";
+
+    // NOTE: This will create 'layerPath' as well.
+    Try<Nothing> mkdir = os::mkdir(rootfs, true);
+    if (mkdir.isError()) {
+      return Failure(
+          "Failed to create rootfs directory '" + rootfs + "' "
+          "for layer '" + digest + "': " + mkdir.error());
+    }
+
+    futures.push_back(command::untar(Path(tar), Path(rootfs)));
+  }
+
+  return collect(futures)
+    .then([=]() -> Future<Image> {
+      // Remove the tarballs after the extraction.
+      foreach (const string& digest, digests) {
+        // Skip if the digest represents the image manifest config.
+        if (digest == manifest.config().digest()) {
+          continue;
+        }
+
+        const string tar = path::join(directory, digest + "-archive");
+
+        Try<Nothing> rm = os::rm(tar);
+        if (rm.isError()) {
+          return Failure(
+              "Failed to remove '" + tar + "' after extraction: " + rm.error());
+        }
+      }
+
+      Image image;
+      image.set_config_digest(manifest.config().digest());
+      image.mutable_reference()->CopyFrom(reference);
+      foreach (const string& layerId, layerIds) {
+        image.add_layer_ids(layerId);
+      }
+
+      return image;
+    });
+}
+
+
+Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
+    const spec::ImageReference& normalizedRef,
+    const string& directory,
     const spec::v2::ImageManifest& manifest,
     const string& backend,
     const Option<Secret::Value>& config)
@@ -464,24 +612,74 @@ Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
     const string& blobSum = manifest.fslayers(i).blobsum();
 
     VLOG(1) << "Fetching blob '" << blobSum << "' for layer '"
-            << v1.id() << "' of image '" << reference << "'";
+            << v1.id() << "' of image '" << normalizedRef << "'";
 
     blobSums.insert(blobSum);
   }
 
-  // Now, actually fetch the blobs.
+  return fetchBlobs(normalizedRef, directory, blobSums, backend, config);
+}
+
+
+Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
+    const spec::ImageReference& normalizedRef,
+    const string& directory,
+    const spec::v2_2::ImageManifest& manifest,
+    const string& backend,
+    const Option<Secret::Value>& config)
+{
+  // First, find all the blobs that need to be fetched.
+  //
+  // NOTE: There might exist duplicated digests in 'layers'. We
+  // just need to fetch one of them.
+  hashset<string> digests;
+
+  const string& configDigest = manifest.config().digest();
+  if (!os::exists(paths::getImageLayerPath(storeDir, configDigest))) {
+    VLOG(1) << "Fetching config '" << configDigest << "' for image '"
+            << normalizedRef << "'";
+
+    digests.insert(configDigest);
+  }
+
+  for (int i = 0; i < manifest.layers_size(); i++) {
+    const string& digest = manifest.layers(i).digest();
+
+    // Check if the layer is in the store or not. If yes, skip the unnecessary
+    // fetching.
+    if (os::exists(paths::getImageLayerRootfsPath(storeDir, digest, backend))) {
+      continue;
+    }
+
+    VLOG(1) << "Fetching layer '" << digest << "' for image '"
+            << normalizedRef << "'";
+
+    digests.insert(digest);
+  }
+
+  return fetchBlobs(normalizedRef, directory, digests, backend, config);
+}
+
+
+Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
+    const spec::ImageReference& normalizedRef,
+    const string& directory,
+    const hashset<string>& digests,
+    const string& backend,
+    const Option<Secret::Value>& config)
+{
   vector<Future<Nothing>> futures;
 
-  foreach (const string& blobSum, blobSums) {
+  foreach (const string& digest, digests) {
     URI blobUri;
 
-    if (reference.has_registry()) {
-      Result<int> port = spec::getRegistryPort(reference.registry());
+    if (normalizedRef.has_registry()) {
+      Result<int> port = spec::getRegistryPort(normalizedRef.registry());
       if (port.isError()) {
         return Failure("Failed to get registry port: " + port.error());
       }
 
-      Try<string> scheme = spec::getRegistryScheme(reference.registry());
+      Try<string> scheme = spec::getRegistryScheme(normalizedRef.registry());
       if (scheme.isError()) {
         return Failure("Failed to get registry scheme: " + scheme.error());
       }
@@ -490,9 +688,9 @@ Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
       // an URL scheme must be specified in '--docker_registry', because
       // there is no scheme allowed in docker image name.
       blobUri = uri::docker::blob(
-          reference.repository(),
-          blobSum,
-          spec::getRegistryHost(reference.registry()),
+          normalizedRef.repository(),
+          digest,
+          spec::getRegistryHost(normalizedRef.registry()),
           scheme.get(),
           port.isSome() ? port.get() : Option<int>());
     } else {
@@ -505,8 +703,8 @@ Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
         : Option<int>();
 
       blobUri = uri::docker::blob(
-          reference.repository(),
-          blobSum,
+          normalizedRef.repository(),
+          digest,
           registry,
           defaultRegistryUrl.scheme,
           port);
@@ -519,7 +717,7 @@ Future<hashset<string>> RegistryPullerProcess::fetchBlobs(
   }
 
   return collect(futures)
-    .then([blobSums]() -> hashset<string> { return blobSums; });
+    .then([digests]() -> hashset<string> { return digests; });
 }
 
 } // namespace docker {
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
index e0f2371..909364d 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -326,6 +326,13 @@ Future<Image> StoreProcess::_get(
       }
     }
 
+    if (image->has_config_digest() &&
+        !os::exists(paths::getImageLayerPath(
+            flags.docker_store_dir,
+            image->config_digest()))) {
+      layerMissed = true;
+    }
+
     if (!layerMissed) {
       return image.get();
     }
@@ -393,16 +400,25 @@ Future<ImageInfo> StoreProcess::__get(
         backend));
   }
 
-  const string path = paths::getImageLayerManifestPath(
-      flags.docker_store_dir,
-      image.layer_ids(image.layer_ids_size() - 1));
+  string configPath;
+  if (image.has_config_digest()) {
+    // Optional 'config_digest' will only be set for docker manifest
+    // V2 Schema2 case.
+    configPath = paths::getImageLayerPath(
+        flags.docker_store_dir,
+        image.config_digest());
+  } else {
+    // Read the manifest from the last layer because all runtime config
+    // are merged at the leaf already.
+    configPath = paths::getImageLayerManifestPath(
+        flags.docker_store_dir,
+        image.layer_ids(image.layer_ids_size() - 1));
+  }
 
-  // Read the manifest from the last layer because all runtime config
-  // are merged at the leaf already.
-  Try<string> manifest = os::read(path);
+  Try<string> manifest = os::read(configPath);
   if (manifest.isError()) {
     return Failure(
-        "Failed to read manifest from '" + path + "': " +
+        "Failed to read manifest from '" + configPath + "': " +
         manifest.error());
   }
 
@@ -411,7 +427,7 @@ Future<ImageInfo> StoreProcess::__get(
 
   if (v1.isError()) {
     return Failure(
-        "Failed to parse docker v1 manifest from '" + path + "': " +
+        "Failed to parse docker v1 manifest from '" + configPath + "': " +
         v1.error());
   }
 
@@ -430,7 +446,25 @@ Future<Image> StoreProcess::moveLayers(
   }
 
   return collect(futures)
-    .then([image]() -> Image { return image; });
+    .then([=]() -> Future<Image> {
+      if (image.has_config_digest()) {
+        const string configSource = path::join(staging, image.config_digest());
+        const string configTarget = paths::getImageLayerPath(
+            flags.docker_store_dir,
+            image.config_digest());
+
+        if (!os::exists(configTarget)) {
+          Try<Nothing> rename = os::rename(configSource, configTarget);
+          if (rename.isError()) {
+            return Failure(
+                "Failed to move image manifest config from '" + configSource +
+                "' to '" + configTarget + "': " + rename.error());
+          }
+        }
+      }
+
+      return image;
+    });
 }
 
 
diff --git a/src/uri/fetchers/docker.cpp b/src/uri/fetchers/docker.cpp
index ffb5194..e11e54f 100644
--- a/src/uri/fetchers/docker.cpp
+++ b/src/uri/fetchers/docker.cpp
@@ -458,19 +458,11 @@ private:
       const http::Headers& authHeaders,
       const http::Response& response);
 
-  Future<Nothing> ___fetch(
+  Future<Nothing> fetchBlobs(
       const URI& uri,
       const string& directory,
-      const http::Headers& authHeaders,
-      const spec::v2::ImageManifest& manifest);
-
-  Try<spec::v2::ImageManifest> saveV2S1Manifest(
-      const string& directory,
-      const http::Response& response);
-
-  Try<spec::v2_2::ImageManifest> saveV2S2Manifest(
-      const string& directory,
-      const http::Response& response);
+      const hashset<string>& digests,
+      const http::Headers& authHeaders);
 
   Future<Nothing> fetchBlob(
       const URI& uri,
@@ -681,13 +673,14 @@ Future<Nothing> DockerFetcherPluginProcess::fetch(
 
   URI manifestUri = getManifestUri(uri);
 
-  // Request a Version 2 Schema 1 manifest. The MIME type of a Schema 1
-  // manifest is described in the following link:
-  // https://docs.docker.com/registry/spec/manifest-v2-1/
-  // Note: The 'Accept' header is required for Amazon ECR. See:
-  // https://forums.aws.amazon.com/message.jspa?messageID=780440
+  // Both docker manifest v2s1 and v2s2 are supported. We put all
+  // accept headers to the curl request for manifest because:
+  // 1. v2+json is needed since some registries start to deprecate
+  //    schema 1 support.
+  // 2. Some registries only support one schema type.
   http::Headers manifestHeaders = {
     {"Accept",
+     "application/vnd.docker.distribution.manifest.v2+json,"
      "application/vnd.docker.distribution.manifest.v1+json,"
      "application/vnd.docker.distribution.manifest.v1+prettyjws"
     }
@@ -738,71 +731,129 @@ Future<Nothing> DockerFetcherPluginProcess::__fetch(
     const http::Headers& authHeaders,
     const http::Response& response)
 {
-  Try<spec::v2::ImageManifest> manifest =
-      saveV2S1Manifest(directory, response);
-
-  if (manifest.isError()) {
-    return Failure(manifest.error());
+  if (response.code != http::Status::OK) {
+    return Failure(
+        "Unexpected HTTP response '" + response.status + "' "
+        "when trying to get the manifest");
   }
 
-#ifdef __WINDOWS__
-  URI manifestUri = getManifestUri(uri);
+  CHECK_EQ(response.type, http::Response::BODY);
 
-  // Fetching version 2 schema 2 manifest:
-  // https://docs.docker.com/registry/spec/manifest-v2-2/
+  Option<string> contentType = response.headers.get("Content-Type");
+  if (contentType.isNone()) {
+    return Failure("No Content-Type present");
+  }
+
+  // NOTE: Docker supports the following five media types.
   //
-  // If fetch is failed, program continues without schema 2 manifest.
+  // V2 schema 1 manifest:
+  // 1. application/vnd.docker.distribution.manifest.v1+json
+  // 2. application/vnd.docker.distribution.manifest.v1+prettyjws
+  // 3. application/json
   //
-  // Schema 2 manifest may have foreign URLs to fetch blobs from servers
-  // other than Docker Hub. Some file layers, for example, Windows OS
-  // layers are only stored on Microsoft servers, so only with schema 2
-  // manifest, such layers can be successfully fetched.
-  http::Headers s2ManifestHeaders = {
-    {"Accept", "application/vnd.docker.distribution.manifest.v2+json"}
-  };
+  // For more details, see:
+  // https://docs.docker.com/registry/spec/manifest-v2-1/
+  //
+  // V2 schema 2 manifest:
+  // 1. application/vnd.docker.distribution.manifest.v2+json
+  // 2. application/vnd.docker.distribution.manifest.list.v2+json
+  //    (manifest list is not supported yet)
+  //
+  // For more details, see:
+  // https://docs.docker.com/registry/spec/manifest-v2-2/
+  bool isV2Schema1 =
+    strings::startsWith(
+        contentType.get(),
+        "application/vnd.docker.distribution.manifest.v1") ||
+    strings::startsWith(
+        contentType.get(),
+        "application/json");
+
+  // TODO(gilbert): Support manifest list (fat manifest) in V2 Schema2.
+  bool isV2Schema2 =
+    contentType.get() == "application/vnd.docker.distribution.manifest.v2+json";
+
+  if (isV2Schema1) {
+    // Parse V2 schema 1 image manifest.
+    Try<spec::v2::ImageManifest> manifest = spec::v2::parse(response.body);
+    if (manifest.isError()) {
+      return Failure(
+          "Failed to parse the V2 Schema 1 image manifest: " +
+          manifest.error());
+    }
 
-  return curl(manifestUri, s2ManifestHeaders + authHeaders, stallTimeout)
-      .then(defer(self(), [=](const http::Response& response)
-          -> Future<Nothing> {
-        Try<spec::v2_2::ImageManifest> manifest =
-            saveV2S2Manifest(directory, response);
+    // Save manifest to 'directory'.
+    Try<Nothing> write = os::write(
+        path::join(directory, "manifest"), response.body);
 
-        if (manifest.isError()) {
-          LOG(WARNING) << "Failed to fetch schema 2 manifest: "
-                       << manifest.error();
-        }
+    if (write.isError()) {
+      return Failure(
+          "Failed to write the V2 Schema 1 image manifest to "
+          "'" + directory + "': " + write.error());
+    }
 
-        return Nothing();
-      }))
-      .then(defer(self(),
-                  &Self::___fetch,
-                  uri,
-                  directory,
-                  authHeaders,
-                  manifest.get()));
-#else
-  return ___fetch(uri, directory, authHeaders, manifest.get());
-#endif
+    // No need to proceed if we only want manifest.
+    if (uri.scheme() == "docker-manifest") {
+      return Nothing();
+    }
+
+    hashset<string> digests;
+    for (int i = 0; i < manifest->fslayers_size(); i++) {
+      digests.insert(manifest->fslayers(i).blobsum());
+    }
+
+    return fetchBlobs(uri, directory, digests, authHeaders);
+  } else if (isV2Schema2) {
+    // Parse V2 schema 2 manifest.
+    Try<spec::v2_2::ImageManifest> manifest =
+      spec::v2_2::parse(response.body);
+
+    if (manifest.isError()) {
+      return Failure(
+          "Failed to parse the V2 Schema 2 image manifest: " +
+          manifest.error());
+    }
+
+    // Save manifest to 'directory'.
+    Try<Nothing> write = os::write(
+        path::join(directory, "manifest"), response.body);
+
+    if (write.isError()) {
+      return Failure(
+          "Failed to write the V2 Schema 2 image manifest to "
+          "'" + directory + "': " + write.error());
+    }
+
+    // No need to proceed if we only want manifest.
+    if (uri.scheme() == "docker-manifest") {
+      return Nothing();
+    }
+
+    hashset<string> digests{manifest->config().digest()};
+    for (int i = 0; i < manifest->layers_size(); i++) {
+      digests.insert(manifest->layers(i).digest());
+    }
+
+    // TODO(gilbert): Verify the digest after contents are fetched.
+    return fetchBlobs(uri, directory, digests, authHeaders);
+  }
+
+  return Failure("Unsupported manifest MIME type: " + contentType.get());
 }
 
 
-Future<Nothing> DockerFetcherPluginProcess::___fetch(
+Future<Nothing> DockerFetcherPluginProcess::fetchBlobs(
     const URI& uri,
     const string& directory,
-    const http::Headers& authHeaders,
-    const spec::v2::ImageManifest& manifest)
+    const hashset<string>& digests,
+    const http::Headers& authHeaders)
 {
-  // No need to proceed if we only want manifest.
-  if (uri.scheme() == "docker-manifest") {
-    return Nothing();
-  }
-
-  // Download all the filesystem layers.
   vector<Future<Nothing>> futures;
-  for (int i = 0; i < manifest.fslayers_size(); i++) {
+
+  foreach (const string& digest, digests) {
     URI blob = uri::docker::blob(
         uri.path(),                         // The 'repository'.
-        manifest.fslayers(i).blobsum(),    // The 'digest'.
+        digest,                             // The 'digest'.
         uri.host(),                         // The 'registry'.
         (uri.has_fragment()                 // The 'scheme'.
           ? Option<string>(uri.fragment())
@@ -811,11 +862,7 @@ Future<Nothing> DockerFetcherPluginProcess::___fetch(
           ? Option<int>(uri.port())
           : None()));
 
-    // Use the same 'authHeaders' as for the manifest to pull the blobs.
-    futures.push_back(fetchBlob(
-        blob,
-        directory,
-        authHeaders));
+    futures.push_back(fetchBlob(blob, directory, authHeaders));
   }
 
   return collect(futures)
@@ -823,115 +870,6 @@ Future<Nothing> DockerFetcherPluginProcess::___fetch(
 }
 
 
-Try<spec::v2::ImageManifest> DockerFetcherPluginProcess::saveV2S1Manifest(
-    const string& directory,
-    const http::Response& response)
-{
-  if (response.code != http::Status::OK) {
-    return Error(
-        "Unexpected HTTP response '" + response.status + "' "
-        "when trying to get the schema 1 manifest");
-  }
-
-  CHECK_EQ(response.type, http::Response::BODY);
-
-  // Check if we got a V2 Schema 1 manifest.
-  // TODO(ipronin): We have to support Schema 2 manifests to be able to use
-  // digests for pulling images that were pushed with Docker 1.10+ to
-  // Registry 2.3+.
-  Option<string> contentType = response.headers.get("Content-Type");
-  if (contentType.isSome()) {
-    // NOTE: Docker support the following three media type for V2
-    // schema 1 manifest:
-    // 1. application/vnd.docker.distribution.manifest.v1+json
-    // 2. application/vnd.docker.distribution.manifest.v1+prettyjws
-    // 3. application/json
-    // For more details, see:
-    // https://docs.docker.com/registry/spec/manifest-v2-1/
-    bool isV2Schema1 =
-      strings::startsWith(
-          contentType.get(),
-          "application/vnd.docker.distribution.manifest.v1") ||
-      strings::startsWith(
-          contentType.get(),
-          "application/json");
-
-    if (!isV2Schema1) {
-      return Error(
-          "Unsupported schema 1 manifest MIME type: " +
-          contentType.get());
-    }
-  }
-
-  Try<spec::v2::ImageManifest> manifest = spec::v2::parse(response.body);
-  if (manifest.isError()) {
-    return Error(
-        "Failed to parse the schema 1 image manifest: " +
-        manifest.error());
-  }
-
-  // Save manifest to 'directory'.
-  Try<Nothing> write = os::write(
-      path::join(directory, "manifest"), response.body);
-
-  if (write.isError()) {
-    return Error(
-        "Failed to write the schema 1 image manifest to '" +
-        directory + "': " + write.error());
-  }
-
-  return manifest;
-}
-
-
-Try<spec::v2_2::ImageManifest> DockerFetcherPluginProcess::saveV2S2Manifest(
-    const string& directory,
-    const http::Response& response)
-{
-  if (response.code != http::Status::OK) {
-    return Error(
-        "Unexpected HTTP response '" + response.status +
-        "' when trying to get the schema 2 manifest");
-  }
-
-  Option<string> contentType = response.headers.get("Content-Type");
-  if (contentType.isSome()) {
-    bool isV2Schema2 =
-      strings::startsWith(
-          contentType.get(),
-          "application/vnd.docker.distribution.manifest.v2") ||
-      strings::startsWith(
-          contentType.get(),
-          "application/json");
-
-    if (!isV2Schema2) {
-      return Error(
-          "Unsupported schema 2 manifest MIME type: " +
-          contentType.get());
-    }
-  }
-
-  Try<spec::v2_2::ImageManifest> manifest = spec::v2_2::parse(response.body);
-  if (manifest.isError()) {
-    return Error(
-        "Failed to parse the schema 2 manifest: " +
-        manifest.error());
-  }
-
-  // Save manifest to 'directory'.
-  Try<Nothing> write = os::write(
-      path::join(directory, "manifest_v2s2"), response.body);
-
-  if (write.isError()) {
-    return Error(
-        "Failed to write the schema 2 image manifest to '" +
-        directory + "': " + write.error());
-  }
-
-  return manifest;
-}
-
-
 Future<Nothing> DockerFetcherPluginProcess::fetchBlob(
     const URI& uri,
     const string& directory,
@@ -1025,7 +963,7 @@ Future<Nothing> DockerFetcherPluginProcess::urlFetchBlob(
       const URI& blobUri,
       const http::Headers& authHeaders)
 {
-  Try<string> _manifest = os::read(path::join(directory, "manifest_v2s2"));
+  Try<string> _manifest = os::read(path::join(directory, "manifest"));
   if (_manifest.isError()) {
     return Failure("Schema 2 manifest does not exist");
   }