You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by tn...@apache.org on 2015/09/14 22:34:24 UTC

[1/2] mesos git commit: Added registry client for Docker provisioner.

Repository: mesos
Updated Branches:
  refs/heads/master c80192ffd -> d7b7a53c1


Added registry client for Docker provisioner.

Review: https://reviews.apache.org/r/37773


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/34750ceb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/34750ceb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/34750ceb

Branch: refs/heads/master
Commit: 34750cebf20233c0695bc4d2d57463188d861f19
Parents: c80192f
Author: Jojy Varghese <jo...@mesosphere.io>
Authored: Wed Sep 9 16:59:12 2015 -0700
Committer: Timothy Chen <tn...@gmail.com>
Committed: Mon Sep 14 13:34:02 2015 -0700

----------------------------------------------------------------------
 docs/persistent-volume.md                       |   2 +-
 src/Makefile.am                                 |   2 +
 .../provisioners/docker/registry_client.cpp     | 540 +++++++++++++++++++
 .../provisioners/docker/registry_client.hpp     | 163 ++++++
 .../provisioners/docker_provisioner_tests.cpp   | 303 ++++++++++-
 5 files changed, 994 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/docs/persistent-volume.md
----------------------------------------------------------------------
diff --git a/docs/persistent-volume.md b/docs/persistent-volume.md
index 0f66442..ae5b0e5 100644
--- a/docs/persistent-volume.md
+++ b/docs/persistent-volume.md
@@ -229,7 +229,7 @@ reserved disk resources:
 
 Note that in 0.23, even after you destroy the persistent volume, its content
 will still be on the disk. The garbage collection for persistent volumes is
-coming soon: [MESOS-2408](https://issues.apache.org/jira/browse/MESOS-2408).
+coming soon: [MESOS-2048](https://issues.apache.org/jira/browse/MESOS-2408).
 
 
 ### `/create` (_Coming Soon_)

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index bb77c2d..8c46539 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -489,6 +489,7 @@ libmesos_no_3rdparty_la_SOURCES =					\
 	slave/containerizer/provisioners/appc/store.cpp			\
 	slave/containerizer/provisioners/backend.cpp			\
 	slave/containerizer/provisioners/backends/copy.cpp              \
+	slave/containerizer/provisioners/docker/registry_client.cpp	\
 	slave/containerizer/provisioners/docker/token_manager.cpp	\
 	slave/resource_estimators/noop.cpp				\
 	usage/usage.cpp							\
@@ -772,6 +773,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	slave/containerizer/provisioners/backend.hpp			\
 	slave/containerizer/provisioners/backends/bind.hpp		\
 	slave/containerizer/provisioners/backends/copy.hpp		\
+	slave/containerizer/provisioners/docker/registry_client.hpp	\
 	slave/containerizer/provisioners/docker/token_manager.hpp	\
 	slave/containerizer/isolators/posix.hpp				\
 	slave/containerizer/isolators/posix/disk.hpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/slave/containerizer/provisioners/docker/registry_client.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/provisioners/docker/registry_client.cpp b/src/slave/containerizer/provisioners/docker/registry_client.cpp
new file mode 100644
index 0000000..fce0563
--- /dev/null
+++ b/src/slave/containerizer/provisioners/docker/registry_client.cpp
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+
+#include "slave/containerizer/provisioners/docker/registry_client.hpp"
+#include "slave/containerizer/provisioners/docker/token_manager.hpp"
+
+using std::string;
+using std::vector;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+
+using process::http::Request;
+using process::http::Response;
+using process::http::URL;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+using FileSystemLayerInfo = RegistryClient::FileSystemLayerInfo;
+
+using ManifestResponse = RegistryClient::ManifestResponse;
+
+const Duration RegistryClient::DEFAULT_MANIFEST_TIMEOUT_SECS = Seconds(10);
+
+const size_t RegistryClient::DEFAULT_MANIFEST_MAXSIZE_BYTES = 4096;
+
+static const uint16_t DEFAULT_SSL_PORT = 443;
+
+class RegistryClientProcess : public Process<RegistryClientProcess>
+{
+public:
+  static Try<Owned<RegistryClientProcess>> create(
+      const URL& authServer,
+      const URL& registry,
+      const Option<RegistryClient::Credentials>& creds);
+
+  Future<RegistryClient::ManifestResponse> getManifest(
+      const string& path,
+      const Option<string>& tag,
+      const Duration& timeout);
+
+  Future<size_t> getBlob(
+      const string& path,
+      const Option<string>& digest,
+      const Path& filePath,
+      const Duration& timeout,
+      size_t maxSize);
+
+private:
+  RegistryClientProcess(
+    const Owned<TokenManager>& tokenMgr,
+    const URL& registryServer,
+    const Option<RegistryClient::Credentials>& creds);
+
+  Future<Response> doHttpGet(
+      const URL& url,
+      const Option<hashmap<string, string>>& headers,
+      const Duration& timeout,
+      bool resend,
+      const Option<string>& lastResponse) const;
+
+  Try<hashmap<string, string>> getAuthenticationAttributes(
+      const Response& httpResponse) const;
+
+  Owned<TokenManager> tokenManager_;
+  const URL registryServer_;
+  const Option<RegistryClient::Credentials> credentials_;
+
+  RegistryClientProcess(const RegistryClientProcess&) = delete;
+  RegistryClientProcess& operator = (const RegistryClientProcess&) = delete;
+};
+
+
+Try<Owned<RegistryClient>> RegistryClient::create(
+    const URL& authServer,
+    const URL& registryServer,
+    const Option<Credentials>& creds)
+{
+  Try<Owned<RegistryClientProcess>> process =
+    RegistryClientProcess::create(authServer, registryServer, creds);
+
+  if (process.isError()) {
+    return Error(process.error());
+  }
+
+  return Owned<RegistryClient>(
+      new RegistryClient(authServer, registryServer, creds, process.get()));
+}
+
+
+RegistryClient::RegistryClient(
+    const URL& authServer,
+    const URL& registryServer,
+    const Option<Credentials>& creds,
+    const Owned<RegistryClientProcess>& process)
+  : authServer_(authServer),
+    registryServer_(registryServer),
+    credentials_(creds),
+    process_(process)
+{
+  spawn(CHECK_NOTNULL(process_.get()));
+}
+
+
+RegistryClient::~RegistryClient()
+{
+  terminate(process_.get());
+  process::wait(process_.get());
+}
+
+
+Future<ManifestResponse> RegistryClient::getManifest(
+    const string& _path,
+    const Option<string>& _tag,
+    const Option<Duration>& _timeout)
+{
+  Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+
+  return dispatch(
+      process_.get(),
+      &RegistryClientProcess::getManifest,
+      _path,
+      _tag,
+      timeout);
+}
+
+
+Future<size_t> RegistryClient::getBlob(
+    const string& _path,
+    const Option<string>& _digest,
+    const Path& _filePath,
+    const Option<Duration>& _timeout,
+    const Option<size_t>& _maxSize)
+{
+  Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+  size_t maxSize = _maxSize.getOrElse(DEFAULT_MANIFEST_MAXSIZE_BYTES);
+
+  return dispatch(
+        process_.get(),
+        &RegistryClientProcess::getBlob,
+        _path,
+        _digest,
+        _filePath,
+        timeout,
+        maxSize);
+}
+
+
+Try<Owned<RegistryClientProcess>> RegistryClientProcess::create(
+    const URL& authServer,
+    const URL& registryServer,
+    const Option<RegistryClient::Credentials>& creds)
+{
+  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(authServer);
+  if (tokenMgr.isError()) {
+    return Error("Failed to create token manager: " + tokenMgr.error());
+  }
+
+  return Owned<RegistryClientProcess>(
+      new RegistryClientProcess(tokenMgr.get(), registryServer, creds));
+}
+
+
+RegistryClientProcess::RegistryClientProcess(
+    const Owned<TokenManager>& tokenMgr,
+    const URL& registryServer,
+    const Option<RegistryClient::Credentials>& creds)
+  : tokenManager_(tokenMgr),
+    registryServer_(registryServer),
+    credentials_(creds) {}
+
+
+Try<hashmap<string, string>>
+RegistryClientProcess::getAuthenticationAttributes(
+    const Response& httpResponse) const
+{
+  if (httpResponse.headers.find("WWW-Authenticate") ==
+      httpResponse.headers.end()) {
+    return Error("Failed to find WWW-Authenticate header value");
+  }
+
+  const string& authString = httpResponse.headers.at("WWW-Authenticate");
+
+  const vector<string> authStringTokens = strings::tokenize(authString, " ");
+  if ((authStringTokens.size() != 2) || (authStringTokens[0] != "Bearer")) {
+    // TODO(jojy): Look at various possibilities of auth response. We currently
+    // assume that the string will have realm information.
+    return Error("Invalid authentication header value: " + authString);
+  }
+
+  const vector<string> authParams = strings::tokenize(authStringTokens[1], ",");
+
+  hashmap<string, string> authAttributes;
+  auto addAttribute = [&authAttributes](
+      const string& param) -> Try<Nothing> {
+    const vector<string> paramTokens =
+      strings::tokenize(param, "=\"");
+
+    if (paramTokens.size() != 2) {
+      return Error(
+          "Failed to get authentication attribute from response parameter " +
+          param);
+    }
+
+    authAttributes.insert({paramTokens[0], paramTokens[1]});
+
+    return Nothing();
+  };
+
+  foreach (const string& param, authParams) {
+    Try<Nothing> addRes = addAttribute(param);
+    if (addRes.isError()) {
+      return Error(addRes.error());
+    }
+  }
+
+  return authAttributes;
+}
+
+
+Future<Response>
+RegistryClientProcess::doHttpGet(
+    const URL& url,
+    const Option<hashmap<string, string>>& headers,
+    const Duration& timeout,
+    bool resend,
+    const Option<string>& lastResponseStatus) const
+{
+  return process::http::get(url, headers)
+    .after(timeout, [](
+        const Future<Response>& httpResponseFuture) -> Future<Response> {
+      return Failure("Response timeout");
+    })
+    .then(defer(self(), [=](
+        const Response& httpResponse) -> Future<Response> {
+      VLOG(1) << "Response status: " + httpResponse.status;
+
+      // Set the future if we get a OK response.
+      if (httpResponse.status == "200 OK") {
+        return httpResponse;
+      }
+
+      // Prevent infinite recursion.
+      if (lastResponseStatus.isSome() &&
+          (lastResponseStatus.get() == httpResponse.status)) {
+        return Failure("Invalid response: " + httpResponse.status);
+      }
+
+      // If resend is not set, we dont try again and stop here.
+      if (!resend) {
+        return Failure("Bad response: " + httpResponse.status);
+      }
+
+      // Handle 401 Unauthorized.
+      if (httpResponse.status == "401 Unauthorized") {
+        Try<hashmap<string, string>> authAttributes =
+          getAuthenticationAttributes(httpResponse);
+
+        if (authAttributes.isError()) {
+          return Failure(
+              "Failed to get authentication attributes: " +
+              authAttributes.error());
+        }
+
+        // TODO(jojy): Currently only handling TLS/cert authentication.
+        Future<Token> tokenResponse = tokenManager_->getToken(
+          authAttributes.get().at("service"),
+          authAttributes.get().at("scope"),
+          None());
+
+        return tokenResponse
+          .after(timeout, [=](
+              Future<Token> tokenResponse) -> Future<Token> {
+            tokenResponse.discard();
+            return Failure("Token response timeout");
+          })
+          .then(defer(self(), [=](
+              const Future<Token>& tokenResponse) {
+            // Send request with acquired token.
+            hashmap<string, string> authHeaders = {
+              {"Authorization", "Bearer " + tokenResponse.get().raw}
+            };
+
+            return doHttpGet(
+                url,
+                authHeaders,
+                timeout,
+                true,
+                httpResponse.status);
+        }));
+      } else if (httpResponse.status == "307 Temporary Redirect") {
+        // Handle redirect.
+
+        // TODO(jojy): Add redirect functionality in http::get.
+
+        auto toURL = [](
+            const string& urlString) -> Try<URL> {
+          // TODO(jojy): Need to add functionality to URL class that parses a
+          // string to its URL components. For now, assuming:
+          //  - scheme is https
+          //  - path always ends with /
+
+          static const string schemePrefix = "https://";
+
+          if (!strings::contains(urlString, schemePrefix)) {
+            return Error(
+                "Failed to find expected token '" + schemePrefix +
+                "' in redirect url");
+          }
+
+          const string schemeSuffix = urlString.substr(schemePrefix.length());
+
+          const vector<string> components =
+            strings::tokenize(schemeSuffix, "/");
+
+          const string path = schemeSuffix.substr(components[0].length());
+
+          const vector<string> addrComponents =
+            strings::tokenize(components[0], ":");
+
+          uint16_t port = DEFAULT_SSL_PORT;
+          string domain = components[0];
+
+          // Parse the port.
+          if (addrComponents.size() == 2) {
+            domain = addrComponents[0];
+
+            Try<uint16_t> tryPort = numify<uint16_t>(addrComponents[1]);
+            if (tryPort.isError()) {
+              return Error(
+                  "Failed to parse location: " + urlString + " for port.");
+            }
+
+            port = tryPort.get();
+          }
+
+          return URL("https", domain, port, path);
+        };
+
+        if (httpResponse.headers.find("Location") ==
+            httpResponse.headers.end()) {
+          return Failure(
+              "Invalid redirect response: 'Location' not found in headers.");
+        }
+
+        const string& location = httpResponse.headers.at("Location");
+        Try<URL> tryUrl = toURL(location);
+        if (tryUrl.isError()) {
+          return Failure(
+              "Failed to parse '" + location + "': " + tryUrl.error());
+        }
+
+        return doHttpGet(
+            tryUrl.get(),
+            headers,
+            timeout,
+            false,
+            httpResponse.status);
+      } else {
+        return Failure("Invalid response: " + httpResponse.status);
+      }
+    }));
+}
+
+
+Future<ManifestResponse> RegistryClientProcess::getManifest(
+    const string& path,
+    const Option<string>& tag,
+    const Duration& timeout)
+{
+  //TODO(jojy): These validations belong in the URL class.
+  if (strings::contains(path, " ")) {
+    return Failure("Invalid repository path: " + path);
+  }
+
+  string repoTag = tag.getOrElse("latest");
+  if (strings::contains(repoTag, " ")) {
+    return Failure("Invalid repository tag: " + repoTag);
+  }
+
+  URL manifestURL(registryServer_);
+  manifestURL.path =
+    "v2/" + path + "/manifests/" + repoTag;
+
+  auto getManifestResponse = [](
+      const Response& httpResponse) -> Try<ManifestResponse> {
+    Try<JSON::Object> responseJSON =
+      JSON::parse<JSON::Object>(httpResponse.body);
+
+    if (responseJSON.isError()) {
+      return Error(responseJSON.error());
+    }
+
+    if (!httpResponse.headers.contains("Docker-Content-Digest")) {
+      return Error("Docker-Content-Digest header missing in response");
+    }
+
+    Result<JSON::String> name = responseJSON.get().find<JSON::String>("name");
+    if (name.isNone()) {
+      return Error("Failed to find \"name\" in manifest response");
+    }
+
+    Result<JSON::Array> fsLayers =
+      responseJSON.get().find<JSON::Array>("fsLayers");
+
+    if (fsLayers.isNone()) {
+      return Error("Failed to find \"fsLayers\" in manifest response");
+    }
+
+    vector<FileSystemLayerInfo> fsLayerInfoList;
+    foreach(const JSON::Value& layer, fsLayers.get().values) {
+      const JSON::Object& layerInfoJSON = layer.as<JSON::Object>();
+      Result<JSON::String> blobSumInfo =
+        layerInfoJSON.find<JSON::String>("blobSum");
+
+      if (blobSumInfo.isNone()) {
+        return Error("Failed to find \"blobSum\" in manifest response");
+      }
+
+      fsLayerInfoList.emplace_back(
+          FileSystemLayerInfo{blobSumInfo.get().value});
+    }
+
+    return ManifestResponse {
+      name.get().value,
+      httpResponse.headers.at("Docker-Content-Digest"),
+      fsLayerInfoList,
+    };
+  };
+
+  return doHttpGet(manifestURL, None(), timeout, true, None())
+    .then([getManifestResponse] (
+        const Future<Response>&  httpResponseFuture
+        ) -> Future<ManifestResponse> {
+      Try<ManifestResponse> manifestResponse =
+        getManifestResponse(httpResponseFuture.get());
+
+      if (manifestResponse.isError()) {
+        return Failure(
+            "Failed to parse manifest response: " + manifestResponse.error());
+      }
+
+      return manifestResponse.get();
+    });
+}
+
+
+Future<size_t> RegistryClientProcess::getBlob(
+    const string& path,
+    const Option<string>& digest,
+    const Path& filePath,
+    const Duration& timeout,
+    size_t maxSize)
+{
+  auto prepare = ([&filePath]() -> Try<Nothing> {
+      const string dirName = filePath.dirname();
+
+      //TODO(jojy): Return more state, for example - if the directory is new.
+      Try<Nothing> dirResult = os::mkdir(dirName, true);
+      if (dirResult.isError()) {
+        return Error(
+            "Failed to create directory to download blob: " +
+            dirResult.error());
+      }
+
+      return dirResult;
+  })();
+
+  // TODO(jojy): This currently leaves a residue in failure cases. Would be
+  // ideal if we can completely rollback.
+  if (prepare.isError()) {
+     return Failure(prepare.error());
+  }
+
+  if (strings::contains(path, " ")) {
+    return Failure("Invalid repository path: " + path);
+  }
+
+  URL blobURL(registryServer_);
+  blobURL.path =
+    "v2/" + path + "/blobs/" + digest.getOrElse("");
+
+  auto saveBlob = [filePath](
+      const Response& httpResponse) -> Try<size_t> {
+    Try<Nothing> writeResult =
+      os::write(filePath, httpResponse.body);
+
+    // TODO(jojy): Add verification step.
+    // TODO(jojy): Add check for max size.
+
+    if (writeResult.isError()) {
+      return Error(writeResult.error());
+    }
+
+    return httpResponse.body.length();
+  };
+
+  return doHttpGet(blobURL, None(), timeout, true, None())
+    .then([saveBlob](
+        const Future<Response>&  httpResponseFuture) -> Future<size_t> {
+      Try<size_t> blobSaved = saveBlob(httpResponseFuture.get());
+      if (blobSaved.isError()) {
+        return Failure("Failed to save blob: " + blobSaved.error());
+      }
+
+     return blobSaved.get();
+    });
+}
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/slave/containerizer/provisioners/docker/registry_client.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/provisioners/docker/registry_client.hpp b/src/slave/containerizer/provisioners/docker/registry_client.hpp
new file mode 100644
index 0000000..184ca0f
--- /dev/null
+++ b/src/slave/containerizer/provisioners/docker/registry_client.hpp
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONERS_DOCKER_REGISTRY_CLIENT_HPP__
+#define __PROVISIONERS_DOCKER_REGISTRY_CLIENT_HPP__
+
+#include <string>
+#include <vector>
+
+#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/path.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+// Forward declarations.
+class RegistryClientProcess;
+
+
+class RegistryClient
+{
+public:
+  /**
+   * Encapsulates information about a file system layer.
+   */
+  struct FileSystemLayerInfo {
+    //TODO(jojy): This string includes the checksum type also now. Need to
+    //separate this into checksum method and checksum.
+    std::string checksumInfo;
+  };
+
+  /**
+   * Encapsulates response of "GET Mannifest" request.
+   *
+   * Reference: https://docs.docker.com/registry/spec/api
+   */
+  struct ManifestResponse {
+    const std::string name;
+    const std::string digest;
+    const std::vector<FileSystemLayerInfo> fsLayerInfoList;
+  };
+
+  /**
+   * Encapsulates auth credentials for the client sessions.
+   * TODO(jojy): Secure heap to protect the credentials.
+   */
+  struct Credentials {
+    /**
+     * UserId for basic authentication.
+     */
+    const Option<std::string> userId;
+    /**
+     * Password for basic authentication.
+     */
+    const Option<std::string> password;
+    /**
+     * Account for fetching data from  registry.
+     */
+    const Option<std::string> account;
+  };
+
+  /**
+   * Factory method for creating RegistryClient objects.
+   *
+   * @param authServer URL of authorization server.
+   * @param registryServer URL of docker registry server.
+   * @param credentials credentials for client session (optional).
+   * @return RegistryClient on Success.
+   *         Error on failure.
+   */
+  static Try<process::Owned<RegistryClient>> create(
+      const process::http::URL& authServer,
+      const process::http::URL& registryServer,
+      const Option<Credentials>& credentials);
+
+  /**
+   * Fetches manifest for a repository from the client's remote registry server.
+   *
+   * @param path path of the repository on the registry.
+   * @param tag unique tag that identifies the repository. Will default to
+   *    latest.
+   * @param timeout Maximum time ater which the request will timeout and return
+   *    a failure. Will default to RESPONSE_TIMEOUT.
+   * @return JSON object on success.
+   *         Failure on process failure.
+   */
+  process::Future<ManifestResponse> getManifest(
+      const std::string& path,
+      const Option<std::string>& tag,
+      const Option<Duration>& timeout);
+
+  /**
+   * Fetches blob for a repository from the client's remote registry server.
+   *
+   * @param path path of the repository on the registry.
+   * @param digest digest of the blob (from manifest).
+   * @param filePath file path to store the fetched blob.
+   * @param timeout Maximum time ater which the request will timeout and return
+   *    a failure. Will default to RESPONSE_TIMEOUT.
+   * @param maxSize Maximum size of the response thats acceptable. Will default
+   *    to MAX_RESPONSE_SIZE.
+   * @return size of downloaded blob on success.
+   *         Failure in case of any errors.
+   */
+  process::Future<size_t> getBlob(
+      const std::string& path,
+      const Option<std::string>& digest,
+      const Path& filePath,
+      const Option<Duration>& timeout,
+      const Option<size_t>& maxSize);
+
+  ~RegistryClient();
+
+private:
+  RegistryClient(
+    const process::http::URL& authServer,
+    const process::http::URL& registryServer,
+    const Option<Credentials>& credentials,
+    const process::Owned<RegistryClientProcess>& process);
+
+  static const Duration DEFAULT_MANIFEST_TIMEOUT_SECS;
+  static const size_t DEFAULT_MANIFEST_MAXSIZE_BYTES;
+
+  const process::http::URL authServer_;
+  const process::http::URL registryServer_;
+  const Option<Credentials> credentials_;
+  process::Owned<RegistryClientProcess> process_;
+
+  RegistryClient(const RegistryClient&) = delete;
+  RegistryClient& operator = (const RegistryClient&) = delete;
+};
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONERS_DOCKER_REGISTRY_CLIENT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/34750ceb/src/tests/provisioners/docker_provisioner_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/provisioners/docker_provisioner_tests.cpp b/src/tests/provisioners/docker_provisioner_tests.cpp
index ff29d56..91ac343 100644
--- a/src/tests/provisioners/docker_provisioner_tests.cpp
+++ b/src/tests/provisioners/docker_provisioner_tests.cpp
@@ -31,6 +31,7 @@
 
 #include <process/ssl/gtest.hpp>
 
+#include "slave/containerizer/provisioners/docker/registry_client.hpp"
 #include "slave/containerizer/provisioners/docker/token_manager.hpp"
 
 #include "tests/mesos.hpp"
@@ -42,6 +43,8 @@ using std::vector;
 using namespace mesos::internal::slave::docker::registry;
 using namespace process;
 
+using ManifestResponse = RegistryClient::ManifestResponse;
+
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -69,6 +72,30 @@ protected:
     return  hdrBase64 + "." + getClaimsBase64() + "." + signBase64;
   }
 
+  string getDefaultTokenString()
+  {
+    // Construct response and send(server side).
+    const double expirySecs = Clock::now().secs() + Days(365).secs();
+
+    claimsJsonString =
+      "{\"access\" \
+        :[ \
+        { \
+          \"type\":\"repository\", \
+            \"name\":\"library/busybox\", \
+            \"actions\":[\"pull\"]}], \
+            \"aud\":\"registry.docker.io\", \
+            \"exp\":" + stringify(expirySecs) + ", \
+            \"iat\":1438887168, \
+            \"iss\":\"auth.docker.io\", \
+            \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+            \"nbf\":1438887166, \
+            \"sub\":\"\" \
+        }";
+
+    return getTokenString();
+  }
+
   const string signBase64 = base64::encode("{\"\"}");
   string claimsJsonString;
 };
@@ -77,12 +104,12 @@ protected:
 /**
  * Fixture for testing TokenManager component.
  */
-class DockerRegistryTokenTest : public TokenHelper, public ::testing::Test
+class RegistryTokenTest : public TokenHelper, public ::testing::Test
 {};
 
 
 // Tests JSON Web Token parsing for a valid token string.
-TEST_F(DockerRegistryTokenTest, ValidToken)
+TEST_F(RegistryTokenTest, ValidToken)
 {
   const double expirySecs = Clock::now().secs() + Days(365).secs();
 
@@ -110,7 +137,7 @@ TEST_F(DockerRegistryTokenTest, ValidToken)
 
 // Tests JSON Web Token parsing for a token string with expiration date in the
 // past.
-TEST_F(DockerRegistryTokenTest, ExpiredToken)
+TEST_F(RegistryTokenTest, ExpiredToken)
 {
   const double expirySecs = Clock::now().secs() - Days(365).secs();
 
@@ -137,7 +164,7 @@ TEST_F(DockerRegistryTokenTest, ExpiredToken)
 
 
 // Tests JSON Web Token parsing for a token string with no expiration date.
-TEST_F(DockerRegistryTokenTest, NoExpiration)
+TEST_F(RegistryTokenTest, NoExpiration)
 {
   claimsJsonString =
     "{\"access\" \
@@ -162,7 +189,7 @@ TEST_F(DockerRegistryTokenTest, NoExpiration)
 
 // Tests JSON Web Token parsing for a token string with not-before date in the
 // future.
-TEST_F(DockerRegistryTokenTest, NotBeforeInFuture)
+TEST_F(RegistryTokenTest, NotBeforeInFuture)
 {
   const double expirySecs = Clock::now().secs() + Days(365).secs();
   const double nbfSecs = Clock::now().secs() + Days(7).secs();
@@ -193,29 +220,36 @@ TEST_F(DockerRegistryTokenTest, NotBeforeInFuture)
 #ifdef USE_SSL_SOCKET
 
 // Test suite for docker registry tests.
-class DockerRegistryClientTest : public virtual SSLTest, public TokenHelper
+class RegistryClientTest : public virtual SSLTest, public TokenHelper
 {
 protected:
-  DockerRegistryClientTest() {}
+  RegistryClientTest() {}
 
   static void SetUpTestCase()
   {
     SSLTest::SetUpTestCase();
-    // TODO(jojy): Add registry specific directory setup. Will be added in the
-    // next patch when docker registry client tests are added.
+
+    if (os::mkdir(RegistryClientTest::OUTPUT_DIR).isError()) {
+      SSLTest::cleanup_directories();
+      ABORT("Could not create temporary directory: " +
+          RegistryClientTest::OUTPUT_DIR);
+    }
   }
 
   static void TearDownTestCase()
   {
     SSLTest::TearDownTestCase();
-    // TODO(jojy): Add registry specific directory cleanup. Will be added in the
-    // next patch when docker registry client tests are added.
+
+    os::rmdir(RegistryClientTest::OUTPUT_DIR);
   }
+
+  static const string OUTPUT_DIR;
 };
 
+const string RegistryClientTest::OUTPUT_DIR = "output_dir";
 
 // Tests TokenManager for a simple token request.
-TEST_F(DockerRegistryClientTest, SimpleGetToken)
+TEST_F(RegistryClientTest, SimpleGetToken)
 {
   Try<Socket> server = setup_server({
       {"SSL_ENABLED", "true"},
@@ -282,7 +316,7 @@ TEST_F(DockerRegistryClientTest, SimpleGetToken)
 
 
 // Tests TokenManager for bad token response from server.
-TEST_F(DockerRegistryClientTest, BadTokenResponse)
+TEST_F(RegistryClientTest, BadTokenResponse)
 {
   Try<Socket> server = setup_server({
       {"SSL_ENABLED", "true"},
@@ -329,7 +363,7 @@ TEST_F(DockerRegistryClientTest, BadTokenResponse)
 
 
 // Tests TokenManager for request to invalid server.
-TEST_F(DockerRegistryClientTest, BadTokenServerAddress)
+TEST_F(RegistryClientTest, BadTokenServerAddress)
 {
   // Create an invalid URL with current time.
   const http::URL url("https", stringify(Clock::now().secs()), 0);
@@ -346,8 +380,247 @@ TEST_F(DockerRegistryClientTest, BadTokenServerAddress)
   AWAIT_FAILED(token);
 }
 
-#endif // USE_SSL_SOCKET
 
+// Tests docker registry's getManifest API.
+TEST_F(RegistryClientTest, SimpleGetManifest)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<RegistryClient>> registryClient =
+    RegistryClient::create(url, url, None());
+
+  ASSERT_SOME(registryClient);
+
+  Future<ManifestResponse> manifestResponseFuture =
+    registryClient.get()->getManifest("library/busybox", "latest", None());
+
+  const string unauthResponseHeaders = "Www-Authenticate: Bearer"
+    " realm=\"https://auth.docker.io/token\","
+    "service=" + stringify(server.get().address().get()) + ","
+    "scope=\"repository:library/busybox:pull\"";
+
+  const string unauthHttpResponse =
+    string("HTTP/1.1 401 Unauthorized\r\n") +
+    unauthResponseHeaders + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Send 401 Unauthorized response for a manifest request.
+  Future<string> manifestHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
+
+  // Token response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(tokenRequestFuture);
+
+  const string tokenResponse =
+    "{\"token\":\"" + getDefaultTokenString() + "\"}";
+
+  const string tokenHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
+
+  // Manifest response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  manifestHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
+
+  const string manifestResponse = " \
+    { \
+      \"schemaVersion\": 1, \
+      \"name\": \"library/busybox\", \
+      \"tag\": \"latest\",  \
+      \"architecture\": \"amd64\",  \
+      \"fsLayers\": [ \
+        { \
+          \"blobSum\": \
+  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  \
+        },  \
+        { \
+          \"blobSum\": \
+  \"sha256:1db09adb5ddd7f1a07b6d585a7db747a51c7bd17418d47e91f901bdf420abd66\"  \
+        },  \
+        { \
+          \"blobSum\": \
+  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  \
+        } \
+      ],  \
+       \"signatures\": [  \
+          { \
+             \"header\": {  \
+                \"jwk\": {  \
+                   \"crv\": \"P-256\",  \
+                   \"kid\": \
+           \"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\",  \
+                   \"kty\": \"EC\", \
+                   \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\", \
+                   \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\" \
+                },  \
+                \"alg\": \"ES256\"  \
+             }, \
+             \"signature\": \
+\"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgOEarAs7_"
+"4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\", \
+       \"protected\": \
+       \"eyJmb3JtYXRMZW5ndGgiOjUwNTgsImZvcm1hdFRhaWwiOiJDbjAiLCJ0aW1lIjoiMjAxNS"
+       "0wOC0xMVQwMzo0Mjo1OVoifQ\"  \
+          } \
+       ]  \
+    }";
+
+  const string manifestHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(manifestResponse.length()) + "\r\n" +
+    "Docker-Content-Digest: "
+    "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
+    + "\r\n" +
+    "\r\n" +
+    manifestResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
+
+  AWAIT_ASSERT_READY(manifestResponseFuture);
+}
+
+
+// Tests docker registry's getBlob API.
+TEST_F(RegistryClientTest, SimpleGetBlob)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<RegistryClient>> registryClient =
+    RegistryClient::create(url, url, None());
+
+  ASSERT_SOME(registryClient);
+
+  const Path blobPath(RegistryClientTest::OUTPUT_DIR + "/blob");
+
+  Future<size_t> resultFuture =
+    registryClient.get()->getBlob(
+        "/blob",
+        "digest",
+        blobPath,
+        None(),
+        None());
+
+  const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
+    " realm=\"https://auth.docker.io/token\","
+    "service=" + stringify(server.get().address().get()) + ","
+    "scope=\"repository:library/busybox:pull\"";
+
+  const string unauthHttpResponse =
+    string("HTTP/1.1 401 Unauthorized\r\n") +
+    unauthResponseHeaders + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Send 401 Unauthorized response.
+  Future<string> blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
+
+  // Send token response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(tokenRequestFuture);
+
+  const string tokenResponse =
+    "{\"token\":\"" + getDefaultTokenString() + "\"}";
+
+  const string tokenHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
+
+  // Send redirect.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+
+  const string redirectHttpResponse =
+    string("HTTP/1.1 307 Temporary Redirect\r\n") +
+    "Location: https://" +
+    stringify(server.get().address().get()) + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
+
+  // Finally send blob response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+
+  const string blobResponse = stringify(Clock::now());
+
+  const string blobHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(blobResponse.length()) + "\r\n" +
+    "\r\n" +
+    blobResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(blobHttpResponse));
+
+  AWAIT_ASSERT_READY(resultFuture);
+
+  Try<string> blob = os::read(blobPath);
+  ASSERT_SOME(blob);
+  ASSERT_EQ(blob.get(), blobResponse);
+}
+
+#endif // USE_SSL_SOCKET
 
 } // namespace tests {
 } // namespace internal {


[2/2] mesos git commit: Handle bad request in Docker registry client.

Posted by tn...@apache.org.
Handle bad request in Docker registry client.

Review: https://reviews.apache.org/r/38289


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d7b7a53c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d7b7a53c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d7b7a53c

Branch: refs/heads/master
Commit: d7b7a53c1f1be9ccc570cdfc25ac62fce3726b51
Parents: 34750ce
Author: Timothy Chen <tn...@apache.org>
Authored: Wed Sep 9 17:33:22 2015 -0700
Committer: Timothy Chen <tn...@gmail.com>
Committed: Mon Sep 14 13:34:08 2015 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   4 +-
 .../provisioners/docker/registry_client.cpp     |  87 ++-
 .../provisioners/docker/registry_client.hpp     |   4 +-
 .../containerizer/docker_provisioner_tests.cpp  | 683 +++++++++++++++++++
 .../provisioners/docker_provisioner_tests.cpp   | 627 -----------------
 5 files changed, 748 insertions(+), 657 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d7b7a53c/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 8c46539..509256f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1664,13 +1664,12 @@ mesos_tests_SOURCES =						\
   tests/paths_tests.cpp						\
   tests/persistent_volume_tests.cpp				\
   tests/protobuf_io_tests.cpp					\
-  tests/provisioners/docker_provisioner_tests.cpp		\
   tests/rate_limiting_tests.cpp					\
   tests/reconciliation_tests.cpp				\
   tests/registrar_tests.cpp					\
   tests/repair_tests.cpp					\
   tests/reservation_tests.cpp					\
-  tests/reservation_endpoints_tests.cpp					\
+  tests/reservation_endpoints_tests.cpp                         \
   tests/resource_offers_tests.cpp				\
   tests/resources_tests.cpp					\
   tests/scheduler_tests.cpp					\
@@ -1692,6 +1691,7 @@ mesos_tests_SOURCES =						\
   tests/containerizer/composing_containerizer_tests.cpp		\
   tests/containerizer/docker_containerizer_tests.cpp		\
   tests/containerizer/docker_tests.cpp				\
+  tests/containerizer/docker_provisioner_tests.cpp	        \
   tests/containerizer/external_containerizer_test.cpp		\
   tests/containerizer/isolator_tests.cpp			\
   tests/containerizer/memory_test_helper.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7b7a53c/src/slave/containerizer/provisioners/docker/registry_client.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/provisioners/docker/registry_client.cpp b/src/slave/containerizer/provisioners/docker/registry_client.cpp
index fce0563..b262ef0 100644
--- a/src/slave/containerizer/provisioners/docker/registry_client.cpp
+++ b/src/slave/containerizer/provisioners/docker/registry_client.cpp
@@ -20,6 +20,7 @@
 
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
+#include <process/io.hpp>
 
 #include "slave/containerizer/provisioners/docker/registry_client.hpp"
 #include "slave/containerizer/provisioners/docker/token_manager.hpp"
@@ -264,6 +265,45 @@ RegistryClientProcess::doHttpGet(
       // Set the future if we get a OK response.
       if (httpResponse.status == "200 OK") {
         return httpResponse;
+      } else if (httpResponse.status == "400 Bad Request") {
+        Try<JSON::Object> errorResponse =
+          JSON::parse<JSON::Object>(httpResponse.body);
+
+        if (errorResponse.isError()) {
+          return Failure("Failed to parse bad request response JSON: " +
+                         errorResponse.error());
+        }
+
+        std::ostringstream out;
+        bool first = true;
+        Result<JSON::Array> errorObjects =
+          errorResponse.get().find<JSON::Array>("errors");
+
+        if (errorObjects.isError()) {
+          return Failure("Failed to find 'errors' in bad request response: " +
+                         errorObjects.error());
+        } else if (errorObjects.isNone()) {
+          return Failure("Errors not found in bad request response");
+        }
+
+        foreach (const JSON::Value& error, errorObjects.get().values) {
+          Result<JSON::String> message =
+            error.as<JSON::Object>().find<JSON::String>("message");
+          if (message.isError()) {
+            return Failure("Failed to parse bad request error message: " +
+                           message.error());
+          } else if (message.isNone()) {
+            continue;
+          }
+
+          if (first) {
+            out << message.get().value;
+            first = false;
+          } else {
+            out << ", " << message.get().value;
+          }
+        }
+        return Failure("Received Bad request, errors: [" + out.str() + "]");
       }
 
       // Prevent infinite recursion.
@@ -410,6 +450,10 @@ Future<ManifestResponse> RegistryClientProcess::getManifest(
 
   auto getManifestResponse = [](
       const Response& httpResponse) -> Try<ManifestResponse> {
+    if (!httpResponse.headers.contains("Docker-Content-Digest")) {
+      return Error("Docker-Content-Digest header missing in response");
+    }
+
     Try<JSON::Object> responseJSON =
       JSON::parse<JSON::Object>(httpResponse.body);
 
@@ -417,10 +461,6 @@ Future<ManifestResponse> RegistryClientProcess::getManifest(
       return Error(responseJSON.error());
     }
 
-    if (!httpResponse.headers.contains("Docker-Content-Digest")) {
-      return Error("Docker-Content-Digest header missing in response");
-    }
-
     Result<JSON::String> name = responseJSON.get().find<JSON::String>("name");
     if (name.isNone()) {
       return Error("Failed to find \"name\" in manifest response");
@@ -434,7 +474,7 @@ Future<ManifestResponse> RegistryClientProcess::getManifest(
     }
 
     vector<FileSystemLayerInfo> fsLayerInfoList;
-    foreach(const JSON::Value& layer, fsLayers.get().values) {
+    foreach (const JSON::Value& layer, fsLayers.get().values) {
       const JSON::Object& layerInfoJSON = layer.as<JSON::Object>();
       Result<JSON::String> blobSumInfo =
         layerInfoJSON.find<JSON::String>("blobSum");
@@ -456,10 +496,8 @@ Future<ManifestResponse> RegistryClientProcess::getManifest(
 
   return doHttpGet(manifestURL, None(), timeout, true, None())
     .then([getManifestResponse] (
-        const Future<Response>&  httpResponseFuture
-        ) -> Future<ManifestResponse> {
-      Try<ManifestResponse> manifestResponse =
-        getManifestResponse(httpResponseFuture.get());
+        const Response& response) -> Future<ManifestResponse> {
+      Try<ManifestResponse> manifestResponse = getManifestResponse(response);
 
       if (manifestResponse.isError()) {
         return Failure(
@@ -507,30 +545,27 @@ Future<size_t> RegistryClientProcess::getBlob(
     "v2/" + path + "/blobs/" + digest.getOrElse("");
 
   auto saveBlob = [filePath](
-      const Response& httpResponse) -> Try<size_t> {
-    Try<Nothing> writeResult =
-      os::write(filePath, httpResponse.body);
-
+      const Response& httpResponse) -> Future<size_t> {
     // TODO(jojy): Add verification step.
     // TODO(jojy): Add check for max size.
-
-    if (writeResult.isError()) {
-      return Error(writeResult.error());
+    size_t size = httpResponse.body.length();
+    Try<int> fd = os::open(
+        filePath.value,
+        O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
+        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+    if (fd.isError()) {
+      return Failure("Failed to open file '" + filePath.value + "': " +
+                     fd.error());
     }
 
-    return httpResponse.body.length();
+    return process::io::write(fd.get(), httpResponse.body)
+      .then([size](const Future<Nothing>&) { return size; })
+      .onAny([fd]() { os::close(fd.get()); } );
   };
 
   return doHttpGet(blobURL, None(), timeout, true, None())
-    .then([saveBlob](
-        const Future<Response>&  httpResponseFuture) -> Future<size_t> {
-      Try<size_t> blobSaved = saveBlob(httpResponseFuture.get());
-      if (blobSaved.isError()) {
-        return Failure("Failed to save blob: " + blobSaved.error());
-      }
-
-     return blobSaved.get();
-    });
+    .then([saveBlob](const Response& response) { return saveBlob(response); });
 }
 
 } // namespace registry {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7b7a53c/src/slave/containerizer/provisioners/docker/registry_client.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/provisioners/docker/registry_client.hpp b/src/slave/containerizer/provisioners/docker/registry_client.hpp
index 184ca0f..b5e2858 100644
--- a/src/slave/containerizer/provisioners/docker/registry_client.hpp
+++ b/src/slave/containerizer/provisioners/docker/registry_client.hpp
@@ -54,7 +54,7 @@ public:
   };
 
   /**
-   * Encapsulates response of "GET Mannifest" request.
+   * Encapsulates response of "GET Manifest" request.
    *
    * Reference: https://docs.docker.com/registry/spec/api
    */
@@ -78,7 +78,7 @@ public:
      */
     const Option<std::string> password;
     /**
-     * Account for fetching data from  registry.
+     * Account for fetching data from registry.
      */
     const Option<std::string> account;
   };

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7b7a53c/src/tests/containerizer/docker_provisioner_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/docker_provisioner_tests.cpp b/src/tests/containerizer/docker_provisioner_tests.cpp
new file mode 100644
index 0000000..53f15e0
--- /dev/null
+++ b/src/tests/containerizer/docker_provisioner_tests.cpp
@@ -0,0 +1,683 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <stout/duration.hpp>
+
+#include <process/address.hpp>
+#include <process/clock.hpp>
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/owned.hpp>
+#include <process/socket.hpp>
+#include <process/subprocess.hpp>
+
+#include <process/ssl/gtest.hpp>
+
+#include "slave/containerizer/provisioners/docker/registry_client.hpp"
+#include "slave/containerizer/provisioners/docker/token_manager.hpp"
+
+#include "tests/mesos.hpp"
+
+using std::map;
+using std::string;
+using std::vector;
+
+using namespace mesos::internal::slave::docker::registry;
+using namespace process;
+
+using ManifestResponse = RegistryClient::ManifestResponse;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+
+/**
+ * Provides token operations and defaults.
+ */
+class TokenHelper {
+protected:
+  const string hdrBase64 = base64::encode(
+    "{ \
+      \"alg\":\"ES256\", \
+      \"typ\":\"JWT\", \
+      \"x5c\":[\"test\"] \
+    }");
+
+  string getClaimsBase64() const
+  {
+    return base64::encode(claimsJsonString);
+  }
+
+  string getTokenString() const
+  {
+    return  hdrBase64 + "." + getClaimsBase64() + "." + signBase64;
+  }
+
+  string getDefaultTokenString()
+  {
+    // Construct response and send(server side).
+    const double expirySecs = Clock::now().secs() + Days(365).secs();
+
+    claimsJsonString =
+      "{\"access\" \
+        :[ \
+        { \
+          \"type\":\"repository\", \
+            \"name\":\"library/busybox\", \
+            \"actions\":[\"pull\"]}], \
+            \"aud\":\"registry.docker.io\", \
+            \"exp\":" + stringify(expirySecs) + ", \
+            \"iat\":1438887168, \
+            \"iss\":\"auth.docker.io\", \
+            \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+            \"nbf\":1438887166, \
+            \"sub\":\"\" \
+        }";
+
+    return getTokenString();
+  }
+
+  const string signBase64 = base64::encode("{\"\"}");
+  string claimsJsonString;
+};
+
+
+/**
+ * Fixture for testing TokenManager component.
+ */
+class RegistryTokenTest : public TokenHelper, public ::testing::Test
+{};
+
+
+// Tests JSON Web Token parsing for a valid token string.
+TEST_F(RegistryTokenTest, ValidToken)
+{
+  const double expirySecs = Clock::now().secs() + Days(365).secs();
+
+  claimsJsonString =
+    "{\"access\" \
+      :[ \
+        { \
+          \"type\":\"repository\", \
+          \"name\":\"library/busybox\", \
+          \"actions\":[\"pull\"]}], \
+          \"aud\":\"registry.docker.io\", \
+          \"exp\":" + stringify(expirySecs) + ", \
+          \"iat\":1438887168, \
+          \"iss\":\"auth.docker.io\", \
+          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+          \"nbf\":1438887166, \
+          \"sub\":\"\" \
+         }";
+
+  Try<Token> token = Token::create(getTokenString());
+
+  ASSERT_SOME(token);
+}
+
+
+// Tests JSON Web Token parsing for a token string with expiration date in the
+// past.
+TEST_F(RegistryTokenTest, ExpiredToken)
+{
+  const double expirySecs = Clock::now().secs() - Days(365).secs();
+
+  claimsJsonString =
+    "{\"access\" \
+      :[ \
+        { \
+          \"type\":\"repository\", \
+          \"name\":\"library/busybox\", \
+          \"actions\":[\"pull\"]}], \
+          \"aud\":\"registry.docker.io\", \
+          \"exp\":" + stringify(expirySecs) + ", \
+          \"iat\":1438887166, \
+          \"iss\":\"auth.docker.io\", \
+          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+          \"nbf\":1438887166, \
+          \"sub\":\"\" \
+         }";
+
+  Try<Token> token = Token::create(getTokenString());
+
+  EXPECT_ERROR(token);
+}
+
+
+// Tests JSON Web Token parsing for a token string with no expiration date.
+TEST_F(RegistryTokenTest, NoExpiration)
+{
+  claimsJsonString =
+    "{\"access\" \
+      :[ \
+        { \
+          \"type\":\"repository\", \
+          \"name\":\"library/busybox\", \
+          \"actions\":[\"pull\"]}], \
+          \"aud\":\"registry.docker.io\", \
+          \"iat\":1438887166, \
+          \"iss\":\"auth.docker.io\", \
+          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+          \"nbf\":1438887166, \
+          \"sub\":\"\" \
+      }";
+
+  const Try<Token> token = Token::create(getTokenString());
+
+  ASSERT_SOME(token);
+}
+
+
+// Tests JSON Web Token parsing for a token string with not-before date in the
+// future.
+TEST_F(RegistryTokenTest, NotBeforeInFuture)
+{
+  const double expirySecs = Clock::now().secs() + Days(365).secs();
+  const double nbfSecs = Clock::now().secs() + Days(7).secs();
+
+  claimsJsonString =
+    "{\"access\" \
+      :[ \
+        { \
+          \"type\":\"repository\", \
+          \"name\":\"library/busybox\", \
+          \"actions\":[\"pull\"]}], \
+          \"aud\":\"registry.docker.io\", \
+          \"exp\":" + stringify(expirySecs) + ", \
+          \"iat\":1438887166, \
+          \"iss\":\"auth.docker.io\", \
+          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+          \"nbf\":" + stringify(nbfSecs) + ", \
+          \"sub\":\"\" \
+         }";
+
+  const Try<Token> token = Token::create(getTokenString());
+
+  ASSERT_SOME(token);
+  ASSERT_EQ(token.get().isValid(), false);
+}
+
+
+#ifdef USE_SSL_SOCKET
+
+// Test suite for docker registry tests.
+class RegistryClientTest : public virtual SSLTest, public TokenHelper
+{
+protected:
+  RegistryClientTest() {}
+
+  static void SetUpTestCase()
+  {
+    SSLTest::SetUpTestCase();
+
+    if (os::mkdir(RegistryClientTest::OUTPUT_DIR).isError()) {
+      SSLTest::cleanup_directories();
+      ABORT("Could not create temporary directory: " +
+          RegistryClientTest::OUTPUT_DIR);
+    }
+  }
+
+  static void TearDownTestCase()
+  {
+    SSLTest::TearDownTestCase();
+
+    os::rmdir(RegistryClientTest::OUTPUT_DIR);
+  }
+
+  static const string OUTPUT_DIR;
+};
+
+const string RegistryClientTest::OUTPUT_DIR = "output_dir";
+
+// Tests TokenManager for a simple token request.
+TEST_F(RegistryClientTest, SimpleGetToken)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  // Create URL from server hostname and port.
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
+  ASSERT_SOME(tokenMgr);
+
+  Future<Token> token =
+    tokenMgr.get()->getToken(
+        "registry.docker.io",
+        "repository:library/busybox:pull",
+        None());
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Construct response and send(server side).
+  const double expirySecs = Clock::now().secs() + Days(365).secs();
+
+  claimsJsonString =
+    "{\"access\" \
+      :[ \
+        { \
+          \"type\":\"repository\", \
+          \"name\":\"library/busybox\", \
+          \"actions\":[\"pull\"]}], \
+          \"aud\":\"registry.docker.io\", \
+          \"exp\":" + stringify(expirySecs) + ", \
+          \"iat\":1438887168, \
+          \"iss\":\"auth.docker.io\", \
+          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+          \"nbf\":1438887166, \
+          \"sub\":\"\" \
+         }";
+
+  const string tokenString(getTokenString());
+  const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
+
+  const string buffer =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
+
+  AWAIT_ASSERT_READY(token);
+  ASSERT_EQ(token.get().raw, tokenString);
+}
+
+
+// Tests TokenManager for bad token response from server.
+TEST_F(RegistryClientTest, BadTokenResponse)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  // Create URL from server hostname and port.
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
+  ASSERT_SOME(tokenMgr);
+
+  Future<Token> token =
+    tokenMgr.get()->getToken(
+        "registry.docker.io",
+        "repository:library/busybox:pull",
+        None());
+
+  AWAIT_ASSERT_READY(socket);
+
+  const string tokenString("bad token");
+  const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
+
+  const string buffer =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
+
+  AWAIT_FAILED(token);
+}
+
+
+// Tests TokenManager for request to invalid server.
+TEST_F(RegistryClientTest, BadTokenServerAddress)
+{
+  // Create an invalid URL with current time.
+  const http::URL url("https", stringify(Clock::now().secs()), 0);
+
+  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
+  ASSERT_SOME(tokenMgr);
+
+  Future<Token> token =
+    tokenMgr.get()->getToken(
+        "registry.docker.io",
+        "repository:library/busybox:pull",
+        None());
+
+  AWAIT_FAILED(token);
+}
+
+
+// Tests docker registry's getManifest API.
+TEST_F(RegistryClientTest, SimpleGetManifest)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<RegistryClient>> registryClient =
+    RegistryClient::create(url, url, None());
+
+  ASSERT_SOME(registryClient);
+
+  Future<ManifestResponse> manifestResponseFuture =
+    registryClient.get()->getManifest("library/busybox", "latest", None());
+
+  const string unauthResponseHeaders = "Www-Authenticate: Bearer"
+    " realm=\"https://auth.docker.io/token\","
+    "service=" + stringify(server.get().address().get()) + ","
+    "scope=\"repository:library/busybox:pull\"";
+
+  const string unauthHttpResponse =
+    string("HTTP/1.1 401 Unauthorized\r\n") +
+    unauthResponseHeaders + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Send 401 Unauthorized response for a manifest request.
+  Future<string> manifestHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
+
+  // Token response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(tokenRequestFuture);
+
+  const string tokenResponse =
+    "{\"token\":\"" + getDefaultTokenString() + "\"}";
+
+  const string tokenHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
+
+  // Manifest response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  manifestHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
+
+  const string manifestResponse = " \
+    { \
+      \"schemaVersion\": 1, \
+      \"name\": \"library/busybox\", \
+      \"tag\": \"latest\",  \
+      \"architecture\": \"amd64\",  \
+      \"fsLayers\": [ \
+        { \
+          \"blobSum\": \
+  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  \
+        },  \
+        { \
+          \"blobSum\": \
+  \"sha256:1db09adb5ddd7f1a07b6d585a7db747a51c7bd17418d47e91f901bdf420abd66\"  \
+        },  \
+        { \
+          \"blobSum\": \
+  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  \
+        } \
+      ],  \
+       \"signatures\": [  \
+          { \
+             \"header\": {  \
+                \"jwk\": {  \
+                   \"crv\": \"P-256\",  \
+                   \"kid\": \
+           \"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\",  \
+                   \"kty\": \"EC\", \
+                   \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\", \
+                   \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\" \
+                },  \
+                \"alg\": \"ES256\"  \
+             }, \
+             \"signature\": \
+\"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgOEarAs7_"
+"4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\", \
+       \"protected\": \
+       \"eyJmb3JtYXRMZW5ndGgiOjUwNTgsImZvcm1hdFRhaWwiOiJDbjAiLCJ0aW1lIjoiMjAxNS"
+       "0wOC0xMVQwMzo0Mjo1OVoifQ\"  \
+          } \
+       ]  \
+    }";
+
+  const string manifestHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(manifestResponse.length()) + "\r\n" +
+    "Docker-Content-Digest: "
+    "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
+    + "\r\n" +
+    "\r\n" +
+    manifestResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
+
+  AWAIT_ASSERT_READY(manifestResponseFuture);
+}
+
+
+// Tests docker registry's getBlob API.
+TEST_F(RegistryClientTest, SimpleGetBlob)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<RegistryClient>> registryClient =
+    RegistryClient::create(url, url, None());
+
+  ASSERT_SOME(registryClient);
+
+  const Path blobPath(RegistryClientTest::OUTPUT_DIR + "/blob");
+
+  Future<size_t> resultFuture =
+    registryClient.get()->getBlob(
+        "/blob",
+        "digest",
+        blobPath,
+        None(),
+        None());
+
+  const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
+    " realm=\"https://auth.docker.io/token\","
+    "service=" + stringify(server.get().address().get()) + ","
+    "scope=\"repository:library/busybox:pull\"";
+
+  const string unauthHttpResponse =
+    string("HTTP/1.1 401 Unauthorized\r\n") +
+    unauthResponseHeaders + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Send 401 Unauthorized response.
+  Future<string> blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
+
+  // Send token response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(tokenRequestFuture);
+
+  const string tokenResponse =
+    "{\"token\":\"" + getDefaultTokenString() + "\"}";
+
+  const string tokenHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
+
+  // Send redirect.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+
+  const string redirectHttpResponse =
+    string("HTTP/1.1 307 Temporary Redirect\r\n") +
+    "Location: https://" +
+    stringify(server.get().address().get()) + "\r\n" +
+    "\r\n";
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
+
+  // Finally send blob response.
+  socket = server.get().accept();
+  AWAIT_ASSERT_READY(socket);
+
+  blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+
+  const string blobResponse = stringify(Clock::now());
+
+  const string blobHttpResponse =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(blobResponse.length()) + "\r\n" +
+    "\r\n" +
+    blobResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(blobHttpResponse));
+
+  AWAIT_ASSERT_READY(resultFuture);
+
+  Try<string> blob = os::read(blobPath);
+  ASSERT_SOME(blob);
+  ASSERT_EQ(blob.get(), blobResponse);
+}
+
+
+TEST_F(RegistryClientTest, BadRequest)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  Try<Owned<RegistryClient>> registryClient =
+    RegistryClient::create(url, url, None());
+
+  ASSERT_SOME(registryClient);
+
+  const Path blobPath(RegistryClientTest::OUTPUT_DIR + "/blob");
+
+  Future<size_t> resultFuture =
+    registryClient.get()->getBlob(
+        "/blob",
+        "digest",
+        blobPath,
+        None(),
+        None());
+
+  const string badRequestResponse =
+    "{\"errors\": [{\"message\": \"Error1\" }, {\"message\": \"Error2\"}]}";
+
+  const string badRequestHttpResponse =
+    string("HTTP/1.1 400 Bad Request\r\n") +
+    "Content-Length : " + stringify(badRequestResponse.length()) + "\r\n" +
+    "\r\n" +
+    badRequestResponse;
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Send 400 Bad Request.
+  Future<string> blobHttpRequestFuture = Socket(socket.get()).recv();
+  AWAIT_ASSERT_READY(blobHttpRequestFuture);
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(badRequestHttpResponse));
+
+  AWAIT_FAILED(resultFuture);
+
+  ASSERT_TRUE(strings::contains(resultFuture.failure(), "Error1"));
+  ASSERT_TRUE(strings::contains(resultFuture.failure(), "Error2"));
+}
+
+#endif // USE_SSL_SOCKET
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7b7a53c/src/tests/provisioners/docker_provisioner_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/provisioners/docker_provisioner_tests.cpp b/src/tests/provisioners/docker_provisioner_tests.cpp
deleted file mode 100644
index 91ac343..0000000
--- a/src/tests/provisioners/docker_provisioner_tests.cpp
+++ /dev/null
@@ -1,627 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <stout/duration.hpp>
-
-#include <process/address.hpp>
-#include <process/clock.hpp>
-#include <process/future.hpp>
-#include <process/gmock.hpp>
-#include <process/owned.hpp>
-#include <process/socket.hpp>
-#include <process/subprocess.hpp>
-
-#include <process/ssl/gtest.hpp>
-
-#include "slave/containerizer/provisioners/docker/registry_client.hpp"
-#include "slave/containerizer/provisioners/docker/token_manager.hpp"
-
-#include "tests/mesos.hpp"
-
-using std::map;
-using std::string;
-using std::vector;
-
-using namespace mesos::internal::slave::docker::registry;
-using namespace process;
-
-using ManifestResponse = RegistryClient::ManifestResponse;
-
-namespace mesos {
-namespace internal {
-namespace tests {
-
-
-/**
- * Provides token operations and defaults.
- */
-class TokenHelper {
-protected:
-  const string hdrBase64 = base64::encode(
-    "{ \
-      \"alg\":\"ES256\", \
-      \"typ\":\"JWT\", \
-      \"x5c\":[\"test\"] \
-    }");
-
-  string getClaimsBase64() const
-  {
-    return base64::encode(claimsJsonString);
-  }
-
-  string getTokenString() const
-  {
-    return  hdrBase64 + "." + getClaimsBase64() + "." + signBase64;
-  }
-
-  string getDefaultTokenString()
-  {
-    // Construct response and send(server side).
-    const double expirySecs = Clock::now().secs() + Days(365).secs();
-
-    claimsJsonString =
-      "{\"access\" \
-        :[ \
-        { \
-          \"type\":\"repository\", \
-            \"name\":\"library/busybox\", \
-            \"actions\":[\"pull\"]}], \
-            \"aud\":\"registry.docker.io\", \
-            \"exp\":" + stringify(expirySecs) + ", \
-            \"iat\":1438887168, \
-            \"iss\":\"auth.docker.io\", \
-            \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-            \"nbf\":1438887166, \
-            \"sub\":\"\" \
-        }";
-
-    return getTokenString();
-  }
-
-  const string signBase64 = base64::encode("{\"\"}");
-  string claimsJsonString;
-};
-
-
-/**
- * Fixture for testing TokenManager component.
- */
-class RegistryTokenTest : public TokenHelper, public ::testing::Test
-{};
-
-
-// Tests JSON Web Token parsing for a valid token string.
-TEST_F(RegistryTokenTest, ValidToken)
-{
-  const double expirySecs = Clock::now().secs() + Days(365).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887168, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-         }";
-
-  Try<Token> token = Token::create(getTokenString());
-
-  ASSERT_SOME(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with expiration date in the
-// past.
-TEST_F(RegistryTokenTest, ExpiredToken)
-{
-  const double expirySecs = Clock::now().secs() - Days(365).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887166, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-         }";
-
-  Try<Token> token = Token::create(getTokenString());
-
-  EXPECT_ERROR(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with no expiration date.
-TEST_F(RegistryTokenTest, NoExpiration)
-{
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"iat\":1438887166, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-      }";
-
-  const Try<Token> token = Token::create(getTokenString());
-
-  ASSERT_SOME(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with not-before date in the
-// future.
-TEST_F(RegistryTokenTest, NotBeforeInFuture)
-{
-  const double expirySecs = Clock::now().secs() + Days(365).secs();
-  const double nbfSecs = Clock::now().secs() + Days(7).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887166, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":" + stringify(nbfSecs) + ", \
-          \"sub\":\"\" \
-         }";
-
-  const Try<Token> token = Token::create(getTokenString());
-
-  ASSERT_SOME(token);
-  ASSERT_EQ(token.get().isValid(), false);
-}
-
-
-#ifdef USE_SSL_SOCKET
-
-// Test suite for docker registry tests.
-class RegistryClientTest : public virtual SSLTest, public TokenHelper
-{
-protected:
-  RegistryClientTest() {}
-
-  static void SetUpTestCase()
-  {
-    SSLTest::SetUpTestCase();
-
-    if (os::mkdir(RegistryClientTest::OUTPUT_DIR).isError()) {
-      SSLTest::cleanup_directories();
-      ABORT("Could not create temporary directory: " +
-          RegistryClientTest::OUTPUT_DIR);
-    }
-  }
-
-  static void TearDownTestCase()
-  {
-    SSLTest::TearDownTestCase();
-
-    os::rmdir(RegistryClientTest::OUTPUT_DIR);
-  }
-
-  static const string OUTPUT_DIR;
-};
-
-const string RegistryClientTest::OUTPUT_DIR = "output_dir";
-
-// Tests TokenManager for a simple token request.
-TEST_F(RegistryClientTest, SimpleGetToken)
-{
-  Try<Socket> server = setup_server({
-      {"SSL_ENABLED", "true"},
-      {"SSL_KEY_FILE", key_path().value},
-      {"SSL_CERT_FILE", certificate_path().value}});
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  // Create URL from server hostname and port.
-  const http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
-  ASSERT_SOME(tokenMgr);
-
-  Future<Token> token =
-    tokenMgr.get()->getToken(
-        "registry.docker.io",
-        "repository:library/busybox:pull",
-        None());
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Construct response and send(server side).
-  const double expirySecs = Clock::now().secs() + Days(365).secs();
-
-  claimsJsonString =
-    "{\"access\" \
-      :[ \
-        { \
-          \"type\":\"repository\", \
-          \"name\":\"library/busybox\", \
-          \"actions\":[\"pull\"]}], \
-          \"aud\":\"registry.docker.io\", \
-          \"exp\":" + stringify(expirySecs) + ", \
-          \"iat\":1438887168, \
-          \"iss\":\"auth.docker.io\", \
-          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
-          \"nbf\":1438887166, \
-          \"sub\":\"\" \
-         }";
-
-  const string tokenString(getTokenString());
-  const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
-
-  const string buffer =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
-
-  AWAIT_ASSERT_READY(token);
-  ASSERT_EQ(token.get().raw, tokenString);
-}
-
-
-// Tests TokenManager for bad token response from server.
-TEST_F(RegistryClientTest, BadTokenResponse)
-{
-  Try<Socket> server = setup_server({
-      {"SSL_ENABLED", "true"},
-      {"SSL_KEY_FILE", key_path().value},
-      {"SSL_CERT_FILE", certificate_path().value}});
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  // Create URL from server hostname and port.
-  const http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
-  ASSERT_SOME(tokenMgr);
-
-  Future<Token> token =
-    tokenMgr.get()->getToken(
-        "registry.docker.io",
-        "repository:library/busybox:pull",
-        None());
-
-  AWAIT_ASSERT_READY(socket);
-
-  const string tokenString("bad token");
-  const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
-
-  const string buffer =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
-
-  AWAIT_FAILED(token);
-}
-
-
-// Tests TokenManager for request to invalid server.
-TEST_F(RegistryClientTest, BadTokenServerAddress)
-{
-  // Create an invalid URL with current time.
-  const http::URL url("https", stringify(Clock::now().secs()), 0);
-
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
-  ASSERT_SOME(tokenMgr);
-
-  Future<Token> token =
-    tokenMgr.get()->getToken(
-        "registry.docker.io",
-        "repository:library/busybox:pull",
-        None());
-
-  AWAIT_FAILED(token);
-}
-
-
-// Tests docker registry's getManifest API.
-TEST_F(RegistryClientTest, SimpleGetManifest)
-{
-  Try<Socket> server = setup_server({
-      {"SSL_ENABLED", "true"},
-      {"SSL_KEY_FILE", key_path().value},
-      {"SSL_CERT_FILE", certificate_path().value}});
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  const http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<RegistryClient>> registryClient =
-    RegistryClient::create(url, url, None());
-
-  ASSERT_SOME(registryClient);
-
-  Future<ManifestResponse> manifestResponseFuture =
-    registryClient.get()->getManifest("library/busybox", "latest", None());
-
-  const string unauthResponseHeaders = "Www-Authenticate: Bearer"
-    " realm=\"https://auth.docker.io/token\","
-    "service=" + stringify(server.get().address().get()) + ","
-    "scope=\"repository:library/busybox:pull\"";
-
-  const string unauthHttpResponse =
-    string("HTTP/1.1 401 Unauthorized\r\n") +
-    unauthResponseHeaders + "\r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Send 401 Unauthorized response for a manifest request.
-  Future<string> manifestHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
-  // Token response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(tokenRequestFuture);
-
-  const string tokenResponse =
-    "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
-  const string tokenHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
-  // Manifest response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  manifestHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(manifestHttpRequestFuture);
-
-  const string manifestResponse = " \
-    { \
-      \"schemaVersion\": 1, \
-      \"name\": \"library/busybox\", \
-      \"tag\": \"latest\",  \
-      \"architecture\": \"amd64\",  \
-      \"fsLayers\": [ \
-        { \
-          \"blobSum\": \
-  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  \
-        },  \
-        { \
-          \"blobSum\": \
-  \"sha256:1db09adb5ddd7f1a07b6d585a7db747a51c7bd17418d47e91f901bdf420abd66\"  \
-        },  \
-        { \
-          \"blobSum\": \
-  \"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\"  \
-        } \
-      ],  \
-       \"signatures\": [  \
-          { \
-             \"header\": {  \
-                \"jwk\": {  \
-                   \"crv\": \"P-256\",  \
-                   \"kid\": \
-           \"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\",  \
-                   \"kty\": \"EC\", \
-                   \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\", \
-                   \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\" \
-                },  \
-                \"alg\": \"ES256\"  \
-             }, \
-             \"signature\": \
-\"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgOEarAs7_"
-"4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\", \
-       \"protected\": \
-       \"eyJmb3JtYXRMZW5ndGgiOjUwNTgsImZvcm1hdFRhaWwiOiJDbjAiLCJ0aW1lIjoiMjAxNS"
-       "0wOC0xMVQwMzo0Mjo1OVoifQ\"  \
-          } \
-       ]  \
-    }";
-
-  const string manifestHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(manifestResponse.length()) + "\r\n" +
-    "Docker-Content-Digest: "
-    "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
-    + "\r\n" +
-    "\r\n" +
-    manifestResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
-
-  AWAIT_ASSERT_READY(manifestResponseFuture);
-}
-
-
-// Tests docker registry's getBlob API.
-TEST_F(RegistryClientTest, SimpleGetBlob)
-{
-  Try<Socket> server = setup_server({
-      {"SSL_ENABLED", "true"},
-      {"SSL_KEY_FILE", key_path().value},
-      {"SSL_CERT_FILE", certificate_path().value}});
-
-  ASSERT_SOME(server);
-  ASSERT_SOME(server.get().address());
-  ASSERT_SOME(server.get().address().get().hostname());
-
-  Future<Socket> socket = server.get().accept();
-
-  const http::URL url(
-      "https",
-      server.get().address().get().hostname().get(),
-      server.get().address().get().port);
-
-  Try<Owned<RegistryClient>> registryClient =
-    RegistryClient::create(url, url, None());
-
-  ASSERT_SOME(registryClient);
-
-  const Path blobPath(RegistryClientTest::OUTPUT_DIR + "/blob");
-
-  Future<size_t> resultFuture =
-    registryClient.get()->getBlob(
-        "/blob",
-        "digest",
-        blobPath,
-        None(),
-        None());
-
-  const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
-    " realm=\"https://auth.docker.io/token\","
-    "service=" + stringify(server.get().address().get()) + ","
-    "scope=\"repository:library/busybox:pull\"";
-
-  const string unauthHttpResponse =
-    string("HTTP/1.1 401 Unauthorized\r\n") +
-    unauthResponseHeaders + "\r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(socket);
-
-  // Send 401 Unauthorized response.
-  Future<string> blobHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(blobHttpRequestFuture);
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
-  // Send token response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  Future<string> tokenRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(tokenRequestFuture);
-
-  const string tokenResponse =
-    "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
-  const string tokenHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(tokenResponse.length()) + "\r\n" +
-    "\r\n" +
-    tokenResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
-  // Send redirect.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  blobHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(blobHttpRequestFuture);
-
-  const string redirectHttpResponse =
-    string("HTTP/1.1 307 Temporary Redirect\r\n") +
-    "Location: https://" +
-    stringify(server.get().address().get()) + "\r\n" +
-    "\r\n";
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
-
-  // Finally send blob response.
-  socket = server.get().accept();
-  AWAIT_ASSERT_READY(socket);
-
-  blobHttpRequestFuture = Socket(socket.get()).recv();
-  AWAIT_ASSERT_READY(blobHttpRequestFuture);
-
-  const string blobResponse = stringify(Clock::now());
-
-  const string blobHttpResponse =
-    string("HTTP/1.1 200 OK\r\n") +
-    "Content-Length : " +
-    stringify(blobResponse.length()) + "\r\n" +
-    "\r\n" +
-    blobResponse;
-
-  AWAIT_ASSERT_READY(Socket(socket.get()).send(blobHttpResponse));
-
-  AWAIT_ASSERT_READY(resultFuture);
-
-  Try<string> blob = os::read(blobPath);
-  ASSERT_SOME(blob);
-  ASSERT_EQ(blob.get(), blobResponse);
-}
-
-#endif // USE_SSL_SOCKET
-
-} // namespace tests {
-} // namespace internal {
-} // namespace mesos {