You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by qi...@apache.org on 2019/04/15 10:35:02 UTC

[mesos] 03/17: Windows: Fetch blobs with V2S2 Docker image manifest.

This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 53979cc77878b546d2fa248b0a71af7beb593094
Author: Liangyu Zhao <t-...@microsoft.com>
AuthorDate: Tue Aug 28 17:08:59 2018 -0700

    Windows: Fetch blobs with V2S2 Docker image manifest.
    
    DockerFetcher now fetches both V2S1 and V2S2 manifests to save on
    disk when agent is running on Windows. Linux part of the code in
    agent is unchanged. In addition to fetching from DockerHub,
    DockerFetcher now supports fetching from foreign URLs provided in
    V2S2 Docker image manifest.
    
    Review: https://reviews.apache.org/r/68454/
---
 src/uri/fetchers/docker.cpp | 364 ++++++++++++++++++++++++++++++++++++++------
 src/uri/fetchers/docker.hpp |   4 +
 2 files changed, 319 insertions(+), 49 deletions(-)

diff --git a/src/uri/fetchers/docker.cpp b/src/uri/fetchers/docker.cpp
index 49dfda4..c1012cb 100644
--- a/src/uri/fetchers/docker.cpp
+++ b/src/uri/fetchers/docker.cpp
@@ -315,7 +315,11 @@ static Future<int> download(
             (output.isFailed() ? output.failure() : "discarded"));
       }
 
+#ifdef __WINDOWS__
+      vector<string> tokens = strings::tokenize(output.get(), "\r\n", 2);
+#else
       vector<string> tokens = strings::tokenize(output.get(), "\n", 2);
+#endif // __WINDOWS__
       if (tokens.empty()) {
         return Failure("Unexpected 'curl' output: " + output.get());
       }
@@ -343,13 +347,25 @@ static Future<int> download(
 
 static Future<int> download(
     const URI& uri,
+    const string& url,
     const string& directory,
     const http::Headers& headers,
     const Option<Duration>& stallTimeout)
 {
-  const string blobPath = path::join(directory, Path(uri.path()).basename());
+  string blobSum;
+
+  auto lastSlash = uri.path().find_last_of('/');
+  if (lastSlash == string::npos) {
+    blobSum = uri.path();
+  } else {
+    blobSum = uri.path().substr(lastSlash + 1);
+  }
+
   return download(
-      strings::trim(stringify(uri)), blobPath, headers, stallTimeout);
+      url,
+      DockerFetcherPlugin::getBlobPath(directory, blobSum),
+      headers,
+      stallTimeout);
 }
 
 
@@ -442,6 +458,20 @@ private:
       const http::Headers& authHeaders,
       const http::Response& response);
 
+  Future<Nothing> ___fetch(
+      const URI& uri,
+      const string& directory,
+      const http::Headers& authHeaders,
+      const spec::v2::ImageManifest& manifest);
+
+  Try<spec::v2::ImageManifest> saveV2S1Manifest(
+      const string& directory,
+      const http::Response& response);
+
+  Try<spec::v2_2::ImageManifest> saveV2S2Manifest(
+      const string& directory,
+      const http::Response& response);
+
   Future<Nothing> fetchBlob(
       const URI& uri,
       const string& directory,
@@ -453,7 +483,19 @@ private:
       const URI& blobUri,
       const http::Headers& basicAuthHeaders);
 
-  Future<Nothing> __fetchBlob(int code);
+#ifdef __WINDOWS__
+  Future<Nothing> urlFetchBlob(
+      const URI& uri,
+      const string& directory,
+      const URI& blobUri,
+      const http::Headers& authHeaders);
+
+  Future<Nothing> _urlFetchBlob(
+      const string& directory,
+      const URI& blobUri,
+      const http::Headers& authHeaders,
+      vector<string> urls);
+#endif
 
   // Returns a token-based authorization header. Basic authorization
   // header may be required to get a proper authorization token.
@@ -516,6 +558,32 @@ Try<Owned<Fetcher::Plugin>> DockerFetcherPlugin::create(const Flags& flags)
 }
 
 
+string DockerFetcherPlugin::getBlobPath(
+    const string& directory,
+    const string& blobSum)
+{
+#ifdef __WINDOWS__
+  std::string path = path::join(directory, blobSum);
+
+  // The colon in disk designator is preserved.
+  auto i = 0;
+  if (path::absolute(path)) {
+    i = path.find_first_of(':') + 1;
+  }
+
+  for (; i < path.size(); ++i) {
+    if (path[i] == ':') {
+      path[i] = '_';
+    }
+  }
+
+  return path;
+#else
+  return path::join(directory, blobSum);
+#endif
+}
+
+
 DockerFetcherPlugin::DockerFetcherPlugin(
     Owned<DockerFetcherPluginProcess> _process)
   : process(_process)
@@ -667,10 +735,94 @@ Future<Nothing> DockerFetcherPluginProcess::__fetch(
     const http::Headers& authHeaders,
     const http::Response& response)
 {
+  Try<spec::v2::ImageManifest> manifest =
+      saveV2S1Manifest(directory, response);
+
+  if (manifest.isError()) {
+    return Failure(manifest.error());
+  }
+
+#ifdef __WINDOWS__
+  URI manifestUri = getManifestUri(uri);
+
+  // Fetching version 2 schema 2 manifest:
+  // https://docs.docker.com/registry/spec/manifest-v2-2/
+  //
+  // If fetch is failed, program continues without schema 2 manifest.
+  http::Headers s2ManifestHeaders = {
+    {"Accept", "application/vnd.docker.distribution.manifest.v2+json"}
+  };
+
+  return curl(manifestUri, s2ManifestHeaders + authHeaders, stallTimeout)
+      .then(defer(self(), [=](const http::Response& response)
+          -> Future<Nothing> {
+        Try<spec::v2_2::ImageManifest> manifest =
+            saveV2S2Manifest(directory, response);
+
+        if (manifest.isError()) {
+          LOG(WARNING) << "Failed to fetch schema 2 manifest: "
+                       << manifest.error();
+        }
+
+        return Nothing();
+      }))
+      .then(defer(self(),
+                  &Self::___fetch,
+                  uri,
+                  directory,
+                  authHeaders,
+                  manifest.get()));
+#else
+  return ___fetch(uri, directory, authHeaders, manifest.get());
+#endif
+}
+
+
+Future<Nothing> DockerFetcherPluginProcess::___fetch(
+    const URI& uri,
+    const string& directory,
+    const http::Headers& authHeaders,
+    const spec::v2::ImageManifest& manifest)
+{
+  // No need to proceed if we only want manifest.
+  if (uri.scheme() == "docker-manifest") {
+    return Nothing();
+  }
+
+  // Download all the filesystem layers.
+  vector<Future<Nothing>> futures;
+  for (int i = 0; i < manifest.fslayers_size(); i++) {
+    URI blob = uri::docker::blob(
+        uri.path(),                         // The 'repository'.
+        manifest.fslayers(i).blobsum(),    // The 'digest'.
+        uri.host(),                         // The 'registry'.
+        (uri.has_fragment()                 // The 'scheme'.
+          ? Option<string>(uri.fragment())
+          : None()),
+        (uri.has_port()                     // The 'port'.
+          ? Option<int>(uri.port())
+          : None()));
+
+    // Use the same 'authHeaders' as for the manifest to pull the blobs.
+    futures.push_back(fetchBlob(
+        blob,
+        directory,
+        authHeaders));
+  }
+
+  return collect(futures)
+    .then([]() -> Future<Nothing> { return Nothing(); });
+}
+
+
+Try<spec::v2::ImageManifest> DockerFetcherPluginProcess::saveV2S1Manifest(
+    const string& directory,
+    const http::Response& response)
+{
   if (response.code != http::Status::OK) {
-    return Failure(
+    return Error(
         "Unexpected HTTP response '" + response.status + "' "
-        "when trying to get the manifest");
+        "when trying to get the schema 1 manifest");
   }
 
   CHECK_EQ(response.type, http::Response::BODY);
@@ -697,54 +849,78 @@ Future<Nothing> DockerFetcherPluginProcess::__fetch(
           "application/json");
 
     if (!isV2Schema1) {
-      return Failure("Unsupported manifest MIME type: " + contentType.get());
+      return Error(
+          "Unsupported schema 1 manifest MIME type: " +
+          contentType.get());
     }
   }
 
   Try<spec::v2::ImageManifest> manifest = spec::v2::parse(response.body);
   if (manifest.isError()) {
-    return Failure("Failed to parse the image manifest: " + manifest.error());
+    return Error(
+        "Failed to parse the schema 1 image manifest: " +
+        manifest.error());
   }
 
   // Save manifest to 'directory'.
   Try<Nothing> write = os::write(
-      path::join(directory, "manifest"),
-      response.body);
+      path::join(directory, "manifest"), response.body);
 
   if (write.isError()) {
-    return Failure(
-        "Failed to write the image manifest to "
-        "'" + directory + "': " + write.error());
+    return Error(
+        "Failed to write the schema 1 image manifest to '" +
+        directory + "': " + write.error());
   }
 
-  // No need to proceed if we only want manifest.
-  if (uri.scheme() == "docker-manifest") {
-    return Nothing();
+  return manifest;
+}
+
+
+Try<spec::v2_2::ImageManifest> DockerFetcherPluginProcess::saveV2S2Manifest(
+    const string& directory,
+    const http::Response& response)
+{
+  if (response.code != http::Status::OK) {
+    return Error(
+        "Unexpected HTTP response '" + response.status +
+        "' when trying to get the schema 2 manifest");
   }
 
-  // Download all the filesystem layers.
-  vector<Future<Nothing>> futures;
-  for (int i = 0; i < manifest->fslayers_size(); i++) {
-    URI blob = uri::docker::blob(
-        uri.path(),                         // The 'repository'.
-        manifest->fslayers(i).blobsum(),    // The 'digest'.
-        uri.host(),                         // The 'registry'.
-        (uri.has_fragment()                 // The 'scheme'.
-          ? Option<string>(uri.fragment())
-          : None()),
-        (uri.has_port()                     // The 'port'.
-          ? Option<int>(uri.port())
-          : None()));
+  Option<string> contentType = response.headers.get("Content-Type");
+  if (contentType.isSome()) {
+    bool isV2Schema2 =
+      strings::startsWith(
+          contentType.get(),
+          "application/vnd.docker.distribution.manifest.v2") ||
+      strings::startsWith(
+          contentType.get(),
+          "application/json");
 
-    // Use the same 'authHeaders' as for the manifest to pull the blobs.
-    futures.push_back(fetchBlob(
-        blob,
-        directory,
-        authHeaders));
+    if (!isV2Schema2) {
+      return Error(
+          "Unsupported schema 2 manifest MIME type: " +
+          contentType.get());
+    }
   }
 
-  return collect(futures)
-    .then([]() -> Future<Nothing> { return Nothing(); });
+  Try<spec::v2_2::ImageManifest> manifest = spec::v2_2::parse(response.body);
+  if (manifest.isError()) {
+    return Error(
+        "Failed to parse the schema 2 manifest: " +
+        manifest.error());
+  }
+
+  // Save manifest to 'directory'.
+  Try<Nothing> write = os::write(
+      path::join(directory, "manifest_v2s2"), response.body);
+
+  if (write.isError()) {
+    return Error(
+        "Failed to write the schema 2 image manifest to '" +
+        directory + "': " + write.error());
+  }
+
+  return manifest;
 }
 
 
@@ -755,7 +931,12 @@ Future<Nothing> DockerFetcherPluginProcess::fetchBlob(
 {
   URI blobUri = getBlobUri(uri);
 
-  return download(blobUri, directory, authHeaders, stallTimeout)
+  return download(
+      blobUri,
+      strings::trim(stringify(blobUri)),
+      directory,
+      authHeaders,
+      stallTimeout)
     .then(defer(self(), [=](int code) -> Future<Nothing> {
       if (code == http::Status::UNAUTHORIZED) {
         // If we get a '401 Unauthorized', we assume that 'authHeaders'
@@ -765,7 +946,17 @@ Future<Nothing> DockerFetcherPluginProcess::fetchBlob(
         return _fetchBlob(uri, directory, blobUri, authHeaders);
       }
 
-      return __fetchBlob(code);
+      if (code == http::Status::OK) {
+        return Nothing();
+      }
+
+#ifdef __WINDOWS__
+      return urlFetchBlob(uri, directory, blobUri, authHeaders);
+#else
+      return Failure(
+          "Unexpected HTTP response '" + http::Status::string(code) + "' "
+          "when trying to download the blob");
+#endif
     }));
 }
 
@@ -793,25 +984,100 @@ Future<Nothing> DockerFetcherPluginProcess::_fetchBlob(
       return getAuthHeader(blobUri, basicAuthHeaders, response)
         .then(defer(self(), [=](
             const http::Headers& authHeaders) -> Future<Nothing> {
-          return download(blobUri, directory, authHeaders, stallTimeout)
-            .then(defer(self(),
-                        &Self::__fetchBlob,
-                        lambda::_1));
+          return download(
+              blobUri,
+              strings::trim(stringify(blobUri)),
+              directory,
+              authHeaders,
+              stallTimeout)
+            .then(defer(self(), [=](int code) -> Future<Nothing> {
+              if (code == http::Status::OK) {
+                return Nothing();
+              }
+
+#ifdef __WINDOWS__
+              return urlFetchBlob(uri, directory, blobUri, authHeaders);
+#else
+              return Failure(
+                  "Unexpected HTTP response '" + http::Status::string(code) +
+                  "' when trying to download blob '" +
+                  strings::trim(stringify(blobUri)) +
+                  "' with schema 1 manifest");
+#endif
+            }));
         }));
     }));
 }
 
 
-Future<Nothing> DockerFetcherPluginProcess::__fetchBlob(int code)
+#ifdef __WINDOWS__
+Future<Nothing> DockerFetcherPluginProcess::urlFetchBlob(
+      const URI& uri,
+      const string& directory,
+      const URI& blobUri,
+      const http::Headers& authHeaders)
 {
-  if (code == http::Status::OK) {
-    return Nothing();
+  Try<string> _manifest = os::read(path::join(directory, "manifest_v2s2"));
+  if (_manifest.isError()) {
+    return Failure("Schema 2 manifest does not exist");
+  }
+
+  Try<spec::v2_2::ImageManifest> manifest = spec::v2_2::parse(_manifest.get());
+  if (manifest.isError()) {
+    return Failure(
+        "Failed to parse the schema 2 manifest: " +
+        manifest.error());
+  }
+
+  const string& blobsum = uri.query(); // blobsum or digest of blob
+  vector<string> urls;
+  for (int i = 0; i < manifest->layers_size(); i++) {
+    if (blobsum != manifest->layers(i).digest()) {
+      continue;
+    }
+    for (int j = 0; j < manifest->layers(i).urls_size(); j++) {
+      urls.emplace_back(manifest->layers(i).urls(j));
+    }
+    break;
+  }
+
+  if (urls.empty()) {
+    return Failure("No foreign url found from schema 2 manifest");
+  }
+
+  return _urlFetchBlob(directory, blobUri, authHeaders, urls);
+}
+
+
+Future<Nothing> DockerFetcherPluginProcess::_urlFetchBlob(
+      const string& directory,
+      const URI& blobUri,
+      const http::Headers& authHeaders,
+      vector<string> urls)
+{
+  if (urls.empty()) {
+    return Failure("Failed to fetch with foreign urls");
   }
 
-  return Failure(
-      "Unexpected HTTP response '" + http::Status::string(code) + "' "
-      "when trying to download the blob");
+  string url = urls.back();
+  urls.pop_back();
+  return download(blobUri, url, directory, authHeaders, stallTimeout)
+      .then(defer(self(), [=](int code) -> Future<Nothing> {
+        if (code == http::Status::OK) {
+          return Nothing();
+        }
+
+        LOG(WARNING) << "Unexpected HTTP response '"
+                      << http::Status::string(code)
+                      << "' when trying to download blob '"
+                      << strings::trim(stringify(blobUri))
+                      << "' from '" << url
+                      << "' in schema 2 manifest";
+
+        return _urlFetchBlob(directory, blobUri, authHeaders, urls);
+      }));
 }
+#endif
 
 
 Future<http::Headers> DockerFetcherPluginProcess::getAuthHeader(
@@ -911,7 +1177,7 @@ URI DockerFetcherPluginProcess::getManifestUri(const URI& uri)
 
   return uri::construct(
       scheme,
-      path::join("/v2", uri.path(), "manifests", uri.query()),
+      strings::join("/", "/v2", uri.path(), "manifests", uri.query()),
       uri.host(),
       (uri.has_port() ? Option<int>(uri.port()) : None()));
 }
@@ -926,7 +1192,7 @@ URI DockerFetcherPluginProcess::getBlobUri(const URI& uri)
 
   return uri::construct(
       scheme,
-      path::join("/v2", uri.path(), "blobs", uri.query()),
+      strings::join("/", "/v2", uri.path(), "blobs", uri.query()),
       uri.host(),
       (uri.has_port() ? Option<int>(uri.port()) : None()));
 }
diff --git a/src/uri/fetchers/docker.hpp b/src/uri/fetchers/docker.hpp
index 7a6193d..2abe735 100644
--- a/src/uri/fetchers/docker.hpp
+++ b/src/uri/fetchers/docker.hpp
@@ -47,6 +47,10 @@ public:
 
   static Try<process::Owned<Fetcher::Plugin>> create(const Flags& flags);
 
+  static std::string getBlobPath(
+      const std::string& directory,
+      const std::string& blobSum);
+
   ~DockerFetcherPlugin() override;
 
   std::set<std::string> schemes() const override;