You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:28 UTC

[04/12] mesos git commit: Relocated MesosContainerizer specific files to the correct location.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
new file mode 100644
index 0000000..f314f20
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <list>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/json.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/subprocess.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
+
+using namespace process;
+
+using std::list;
+using std::pair;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class LocalPullerProcess : public process::Process<LocalPullerProcess>
+{
+public:
+  LocalPullerProcess(const Flags& _flags) : flags(_flags) {}
+
+  ~LocalPullerProcess() {}
+
+  process::Future<list<pair<string, string>>> pull(
+      const Image::Name& name,
+      const string& directory);
+
+private:
+  process::Future<Nothing> untarImage(
+      const std::string& tarPath,
+      const std::string& directory);
+
+  process::Future<list<pair<string, string>>> putImage(
+      const Image::Name& name,
+      const std::string& directory);
+
+  process::Future<list<pair<string, string>>> putLayers(
+      const std::string& directory,
+      const std::vector<std::string>& layerIds);
+
+  process::Future<pair<string, string>> putLayer(
+      const std::string& directory,
+      const std::string& layerId);
+
+  const Flags flags;
+};
+
+
+LocalPuller::LocalPuller(const Flags& flags)
+{
+  process = Owned<LocalPullerProcess>(new LocalPullerProcess(flags));
+  process::spawn(process.get());
+}
+
+
+LocalPuller::~LocalPuller()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Future<list<pair<string, string>>> LocalPuller::pull(
+    const Image::Name& name,
+    const string& directory)
+{
+  return dispatch(process.get(), &LocalPullerProcess::pull, name, directory);
+}
+
+
+Future<list<pair<string, string>>> LocalPullerProcess::pull(
+    const Image::Name& name,
+    const string& directory)
+{
+  const string tarPath = paths::getImageArchiveTarPath(
+      flags.docker_local_archives_dir,
+      stringify(name));
+
+  if (!os::exists(tarPath)) {
+    return Failure("Failed to find archive for image '" + stringify(name) +
+                   "' at '" + tarPath + "'");
+  }
+
+  return untarImage(tarPath, directory)
+    .then(defer(self(), &Self::putImage, name, directory));
+}
+
+
+Future<Nothing> LocalPullerProcess::untarImage(
+    const string& tarPath,
+    const string& directory)
+{
+  VLOG(1) << "Untarring image from '" << tarPath
+          << "' to '" << directory << "'";
+
+  // Untar store_discovery_local_dir/name.tar into directory/.
+  // TODO(tnachen): Terminate tar process when slave exits.
+  const vector<string> argv = {
+    "tar",
+    "-C",
+    directory,
+    "-x",
+    "-f",
+    tarPath
+  };
+
+  Try<Subprocess> s = subprocess(
+      "tar",
+      argv,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"));
+
+  if (s.isError()) {
+    return Failure("Failed to create tar subprocess: " + s.error());
+  }
+
+  return s.get().status()
+    .then([tarPath](const Option<int>& status) -> Future<Nothing> {
+      if (status.isNone()) {
+        return Failure("Failed to reap status for tar subprocess in " +
+                        tarPath);
+      }
+      if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) {
+          return Failure("Untar image failed with exit code: " +
+                          WSTRINGIFY(status.get()));
+      }
+
+      return Nothing();
+    });
+}
+
+
+static Result<string> getParentId(
+    const string& directory,
+    const string& layerId)
+{
+  Try<string> manifest =
+    os::read(paths::getImageArchiveLayerManifestPath(directory, layerId));
+
+  if (manifest.isError()) {
+    return Error("Failed to read manifest: " + manifest.error());
+  }
+
+  Try<JSON::Object> json = JSON::parse<JSON::Object>(manifest.get());
+  if (json.isError()) {
+    return Error("Failed to parse manifest: " + json.error());
+  }
+
+  Result<JSON::String> parentId = json.get().find<JSON::String>("parent");
+  if (parentId.isNone() || (parentId.isSome() && parentId.get() == "")) {
+    return None();
+  } else if (parentId.isError()) {
+    return Error("Failed to read parent of layer: " + parentId.error());
+  }
+
+  return parentId.get().value;
+}
+
+
+Future<list<pair<string, string>>> LocalPullerProcess::putImage(
+    const Image::Name& name,
+    const string& directory)
+{
+  Try<string> value =
+    os::read(paths::getImageArchiveRepositoriesPath(directory));
+
+  if (value.isError()) {
+    return Failure("Failed to read repository JSON: " + value.error());
+  }
+
+  Try<JSON::Object> json = JSON::parse<JSON::Object>(value.get());
+  if (json.isError()) {
+    return Failure("Failed to parse JSON: " + json.error());
+  }
+
+  Result<JSON::Object> repositoryValue =
+    json.get().find<JSON::Object>(name.repository());
+
+  if (repositoryValue.isError()) {
+    return Failure("Failed to find repository: " + repositoryValue.error());
+  } else if (repositoryValue.isNone()) {
+    return Failure("Repository '" + name.repository() + "' is not found");
+  }
+
+  const JSON::Object repositoryJson = repositoryValue.get();
+
+  // We don't use JSON find here because a tag might contain a '.'.
+  std::map<string, JSON::Value>::const_iterator entry =
+    repositoryJson.values.find(name.tag());
+
+  if (entry == repositoryJson.values.end()) {
+    return Failure("Tag '" + name.tag() + "' is not found");
+  } else if (!entry->second.is<JSON::String>()) {
+    return Failure("Tag JSON value expected to be JSON::String");
+  }
+
+  const string layerId = entry->second.as<JSON::String>().value;
+
+  Try<string> manifest =
+    os::read(paths::getImageArchiveLayerManifestPath(directory, layerId));
+
+  if (manifest.isError()) {
+    return Failure("Failed to read manifest: " + manifest.error());
+  }
+
+  Try<JSON::Object> manifestJson = JSON::parse<JSON::Object>(manifest.get());
+  if (manifestJson.isError()) {
+    return Failure("Failed to parse manifest: " + manifestJson.error());
+  }
+
+  vector<string> layerIds;
+  layerIds.push_back(layerId);
+  Result<string> parentId = getParentId(directory, layerId);
+  while (parentId.isSome()) {
+    layerIds.insert(layerIds.begin(), parentId.get());
+    parentId = getParentId(directory, parentId.get());
+  }
+
+  if (parentId.isError()) {
+    return Failure("Failed to find parent layer id of layer '" + layerId +
+                   "': " + parentId.error());
+  }
+
+  return putLayers(directory, layerIds);
+}
+
+
+Future<list<pair<string, string>>> LocalPullerProcess::putLayers(
+    const string& directory,
+    const vector<string>& layerIds)
+{
+  list<Future<pair<string, string>>> futures;
+  foreach (const string& layerId, layerIds) {
+    futures.push_back(putLayer(directory, layerId));
+  }
+
+  return collect(futures);
+}
+
+
+Future<pair<string, string>> LocalPullerProcess::putLayer(
+    const string& directory,
+    const string& layerId)
+{
+  // We untar the layer from source into a directory, then move the
+  // layer into store. We do this instead of untarring directly to
+  // store to make sure we don't end up with partially untarred layer
+  // rootfs.
+
+  const string localRootfsPath =
+    paths::getImageArchiveLayerRootfsPath(directory, layerId);
+
+  // Image layer has been untarred but is not present in the store directory.
+  if (os::exists(localRootfsPath)) {
+    LOG(WARNING) << "Image layer '" << layerId << "' rootfs present at but not "
+                 << "in store directory '" << localRootfsPath << "'. Removing "
+                 << "staged rootfs and untarring layer again.";
+
+    Try<Nothing> rmdir = os::rmdir(localRootfsPath);
+    if (rmdir.isError()) {
+      return Failure("Failed to remove incomplete staged rootfs for layer '" +
+                     layerId + "': " + rmdir.error());
+    }
+  }
+
+  Try<Nothing> mkdir = os::mkdir(localRootfsPath);
+  if (mkdir.isError()) {
+    return Failure("Failed to create rootfs path '" + localRootfsPath +
+                   "': " + mkdir.error());
+  }
+
+  // Untar directory/id/layer.tar into directory/id/rootfs.
+  // The tar file will be removed when the staging directory is
+  // removed.
+  const vector<string> argv = {
+    "tar",
+    "-C",
+    localRootfsPath,
+    "-x",
+    "-f",
+    paths::getImageArchiveLayerTarPath(directory, layerId)
+  };
+
+  Try<Subprocess> s = subprocess(
+      "tar",
+      argv,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PATH("/dev/null"));
+
+  if (s.isError()) {
+    return Failure("Failed to create tar subprocess: " + s.error());
+  }
+
+  return s.get().status()
+    .then([directory, layerId](
+        const Option<int>& status) -> Future<pair<string, string>> {
+      if (status.isNone()) {
+        return Failure("Failed to reap subprocess to untar image");
+      } else if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) {
+        return Failure("Untar failed with exit code: " +
+                        WSTRINGIFY(status.get()));
+      }
+
+      const string rootfsPath =
+        paths::getImageArchiveLayerRootfsPath(directory, layerId);
+
+      if (!os::exists(rootfsPath)) {
+        return Failure("Failed to find the rootfs path after extracting layer"
+                       " '" + layerId + "'");
+      }
+
+      return pair<string, string>(layerId, rootfsPath);
+    });
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
new file mode 100644
index 0000000..87d8002
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
+#define __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
+
+#include "slave/containerizer/mesos/provisioner/store.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Forward declaration.
+class LocalPullerProcess;
+
+
+/**
+ * LocalPuller assumes Docker images are stored in a local directory
+ * (configured with flags.docker_local_archives_dir), with all the
+ * images saved as tars with file names in the form of <repo>:<tag>.tar.
+ */
+class LocalPuller : public Puller
+{
+public:
+  explicit LocalPuller(const Flags& flags);
+
+  ~LocalPuller();
+
+  process::Future<std::list<std::pair<std::string, std::string>>> pull(
+      const Image::Name& name,
+      const std::string& directory);
+
+private:
+  LocalPuller& operator=(const LocalPuller&) = delete; // Not assignable.
+  LocalPuller(const LocalPuller&) = delete; // Not copyable.
+
+  process::Owned<LocalPullerProcess> process;
+};
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/message.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/message.hpp b/src/slave/containerizer/mesos/provisioner/docker/message.hpp
new file mode 100644
index 0000000..bb5248c
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/message.hpp
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESSAGES_DOCKER_PROVISIONER_HPP__
+#define __MESSAGES_DOCKER_PROVISIONER_HPP__
+
+#include <stout/strings.hpp>
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include "slave/containerizer/mesos/provisioner/docker/message.pb.h"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Docker expects the image to be specified on the command line as:
+//   [REGISTRY_HOST[:REGISTRY_PORT]/]REPOSITORY[:TAG|@TYPE:DIGEST]
+//
+// This format is inherently ambiguous when dealing with repository
+// names that include forward slashes. To disambiguate, the docker
+// code looks for '.', or ':', or 'localhost' to decide if the
+// first component is a registry or a respository name. For more
+// detail, drill into the implementation of docker pull.
+//
+// TODO(bmahler): We currently store the digest as a tag, does
+// that makes sense?
+//
+// TODO(bmahler): Validate based on docker's validation logic
+// and return a Try here.
+inline Image::Name parseImageName(std::string s)
+{
+  Image::Name name;
+
+  // Extract the digest.
+  if (strings::contains(s, "@")) {
+    std::vector<std::string> split = strings::split(s, "@");
+
+    s = split[0];
+    name.set_tag(split[1]);
+  }
+
+  // Remove the tag. We need to watch out for a
+  // host:port registry, which also contains ':'.
+  if (strings::contains(s, ":")) {
+    std::vector<std::string> split = strings::split(s, ":");
+
+    // The tag must be the last component. If a slash is
+    // present there is a registry port and no tag.
+    if (!strings::contains(split.back(), "/")) {
+      name.set_tag(split.back());
+      split.pop_back();
+
+      s = strings::join(":", split);
+    }
+  }
+
+  // Default to the 'latest' tag when omitted.
+  if (name.tag().empty()) {
+    name.set_tag("latest");
+  }
+
+  // Extract the registry and repository. The first component can
+  // either be the registry, or the first part of the repository!
+  // We resolve this ambiguity using the same hacks used in the
+  // docker code ('.', ':', 'localhost' indicate a registry).
+  std::vector<std::string> split = strings::split(s, "/", 2);
+
+  if (split.size() == 1) {
+    name.set_repository(s);
+  } else if (strings::contains(split[0], ".") ||
+             strings::contains(split[0], ":") ||
+             split[0] == "localhost") {
+    name.set_registry(split[0]);
+    name.set_repository(split[1]);
+  } else {
+    name.set_repository(s);
+  }
+
+  return name;
+}
+
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const Image::Name& name)
+{
+  if (name.has_registry()) {
+    return stream << name.registry() << "/" << name.repository() << ":"
+                  << name.tag();
+  }
+
+  return stream << name.repository() << ":" << name.tag();
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESSAGES_DOCKER_PROVISIONER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/message.proto
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/message.proto b/src/slave/containerizer/mesos/provisioner/docker/message.proto
new file mode 100644
index 0000000..c33e0c5
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/message.proto
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos/mesos.proto";
+
+package mesos.internal.slave.docker;
+
+/**
+ * A Docker Image name and the layer ids of the layers that comprise the image.
+ * The layerIds are ordered, with the root layer id (no parent layer id) first
+ * and the leaf layer id last.
+ */
+message Image {
+  message Name {
+    optional string registry = 1;
+    required string repository = 2;
+
+    // TODO(bmahler): This may hold a tag or a digest, split these?
+    required string tag = 3;
+  }
+
+  required Name name = 1;
+
+  // The order of the layers represents the dependency between layers.
+  repeated string layer_ids = 2;
+}
+
+
+message Images {
+  repeated Image images = 1;
+}
+
+
+/**
+* Protobuf for the Docker image manifest JSON schema:
+* https://github.com/docker/distribution/blob/master/docs/spec/manifest-v2-1.md
+*/
+message DockerImageManifest {
+  required string name = 1;
+  required string tag = 2;
+  required string architecture = 3;
+
+  message FsLayers {
+    required string blobSum = 1;
+  }
+
+  repeated FsLayers fsLayers = 4;
+
+  message History {
+    message V1Compatibility {
+      required string id = 1;
+      required string parent = 2;
+    }
+
+    required V1Compatibility v1Compatibility = 1;
+  }
+
+  repeated History history = 5;
+  required uint32 schemaVersion = 6;
+
+  message Signatures {
+
+    //JOSE (A JSON Web Signature).
+    message Header {
+
+      //JSON Web Key.
+      message Jwk {
+        required string crv = 1;
+        required string kid = 2;
+        required string kty = 3;
+        required string x = 4;
+        required string y = 5;
+      }
+
+      optional Jwk jwk = 1;
+      required string alg = 2;
+    }
+
+    required Header header = 1;
+    required string signature = 2;
+    required string protected = 3;
+  }
+
+  repeated Signatures signatures = 7;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp
new file mode 100644
index 0000000..af6f5b8
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
+
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/foreach.hpp>
+#include <stout/hashset.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/owned.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+#include "slave/state.hpp"
+
+using namespace process;
+
+using std::list;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class MetadataManagerProcess : public process::Process<MetadataManagerProcess>
+{
+public:
+  MetadataManagerProcess(const Flags& _flags) : flags(_flags) {}
+
+  ~MetadataManagerProcess() {}
+
+  Future<Nothing> recover();
+
+  Future<Image> put(
+      const Image::Name& name,
+      const std::vector<std::string>& layerIds);
+
+  Future<Option<Image>> get(const Image::Name& name);
+
+  // TODO(chenlily): Implement removal of unreferenced images.
+
+private:
+  // Write out metadata manager state to persistent store.
+  Try<Nothing> persist();
+
+  const Flags flags;
+
+  // This is a lookup table for images that are stored in memory. It is keyed
+  // by the name of the Image.
+  // For example, "ubuntu:14.04" -> ubuntu14:04 Image.
+  hashmap<std::string, Image> storedImages;
+};
+
+
+Try<Owned<MetadataManager>> MetadataManager::create(const Flags& flags)
+{
+  Owned<MetadataManagerProcess> process(new MetadataManagerProcess(flags));
+
+  return Owned<MetadataManager>(new MetadataManager(process));
+}
+
+
+MetadataManager::MetadataManager(Owned<MetadataManagerProcess> process)
+  : process(process)
+{
+  process::spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+MetadataManager::~MetadataManager()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Future<Nothing> MetadataManager::recover()
+{
+  return process::dispatch(process.get(), &MetadataManagerProcess::recover);
+}
+
+
+Future<Image> MetadataManager::put(
+    const Image::Name& name,
+    const vector<string>& layerIds)
+{
+  return dispatch(
+      process.get(),
+      &MetadataManagerProcess::put,
+      name,
+      layerIds);
+}
+
+
+Future<Option<Image>> MetadataManager::get(const Image::Name& name)
+{
+  return dispatch(process.get(), &MetadataManagerProcess::get, name);
+}
+
+
+Future<Image> MetadataManagerProcess::put(
+    const Image::Name& name,
+    const vector<string>& layerIds)
+{
+  const string imageName = stringify(name);
+
+  Image dockerImage;
+  dockerImage.mutable_name()->CopyFrom(name);
+  foreach (const string& layerId, layerIds) {
+    dockerImage.add_layer_ids(layerId);
+  }
+
+  storedImages[imageName] = dockerImage;
+
+  Try<Nothing> status = persist();
+  if (status.isError()) {
+    return Failure("Failed to save state of Docker images: " + status.error());
+  }
+
+  return dockerImage;
+}
+
+
+Future<Option<Image>> MetadataManagerProcess::get(
+    const Image::Name& name)
+{
+  const string imageName = stringify(name);
+
+  if (!storedImages.contains(imageName)) {
+    return None();
+  }
+
+  return storedImages[imageName];
+}
+
+
+Try<Nothing> MetadataManagerProcess::persist()
+{
+  Images images;
+
+  foreachvalue (const Image& image, storedImages) {
+    images.add_images()->CopyFrom(image);
+  }
+
+  Try<Nothing> status = state::checkpoint(
+      paths::getStoredImagesPath(flags.docker_store_dir), images);
+  if (status.isError()) {
+    return Error("Failed to perform checkpoint: " + status.error());
+  }
+
+  return Nothing();
+}
+
+
+Future<Nothing> MetadataManagerProcess::recover()
+{
+  string storedImagesPath = paths::getStoredImagesPath(flags.docker_store_dir);
+
+  if (!os::exists(storedImagesPath)) {
+    LOG(INFO) << "No images to load from disk. Docker provisioner image "
+              << "storage path '" << storedImagesPath << "' does not exist";
+    return Nothing();
+  }
+
+  Result<Images> images = ::protobuf::read<Images>(storedImagesPath);
+  if (images.isError()) {
+    return Failure("Failed to read protobuf for Docker provisioner image: " +
+                   images.error());
+  }
+
+  foreach (const Image image, images.get().images()) {
+    vector<string> missingLayerIds;
+    foreach (const string layerId, image.layer_ids()) {
+      const string rootfsPath =
+        paths::getImageLayerRootfsPath(flags.docker_store_dir, layerId);
+
+      if (!os::exists(rootfsPath)) {
+        missingLayerIds.push_back(layerId);
+      }
+    }
+
+    if (!missingLayerIds.empty()) {
+      LOG(WARNING) << "Skipped loading image  '" << stringify(image.name())
+                   << "' due to missing layers: " << stringify(missingLayerIds);
+      continue;
+    }
+
+    const string imageName = stringify(image.name());
+    if (storedImages.contains(imageName)) {
+      LOG(WARNING) << "Found duplicate image in recovery for image name '"
+                   << imageName << "'";
+    } else {
+      storedImages[imageName] = image;
+    }
+  }
+
+  LOG(INFO) << "Loaded " << storedImages.size() << " Docker images";
+
+  return Nothing();
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp
new file mode 100644
index 0000000..dbae8d8
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_METADATA_MANAGER_HPP__
+#define __PROVISIONER_DOCKER_METADATA_MANAGER_HPP__
+
+#include <list>
+#include <string>
+
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include "slave/containerizer/mesos/provisioner/provisioner.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Forward Declaration.
+class MetadataManagerProcess;
+
+/**
+ * The MetadataManager tracks the Docker images cached by the
+ * provisioner that are stored on disk. It keeps track of the layers
+ * that Docker images are composed of and recovers Image objects
+ * upon initialization by checking for dependent layers stored on disk.
+ * Currently, image layers are stored indefinitely, with no garbage
+ * collection of unreferenced image layers.
+ */
+class MetadataManager
+{
+public:
+  static Try<process::Owned<MetadataManager>> create(const Flags& flags);
+
+  ~MetadataManager();
+
+  /**
+   * Recover all stored Image and its layer references.
+   */
+  process::Future<Nothing> recover();
+
+  /**
+   * Create an Image, put it in metadata manager and persist the reference
+   * store state to disk.
+   *
+   * @param name     the name of the Docker image to place in the reference
+   *                 store.
+   * @param layerIds the list of layer ids that comprise the Docker image in
+   *                 order where the root layer's id (no parent layer) is first
+   *                 and the leaf layer's id is last.
+   */
+  process::Future<Image> put(
+      const Image::Name& name,
+      const std::vector<std::string>& layerIds);
+
+  /**
+   * Retrieve Image based on image name if it is among the Images
+   * stored in memory.
+   *
+   * @param name  the name of the Docker image to retrieve
+   */
+  process::Future<Option<Image>> get(const Image::Name& name);
+
+private:
+  explicit MetadataManager(process::Owned<MetadataManagerProcess> process);
+
+  MetadataManager(const MetadataManager&); // Not copyable.
+  MetadataManager& operator=(const MetadataManager&); // Not assignable.
+
+  process::Owned<MetadataManagerProcess> process;
+};
+
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_METADATA_MANAGER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.cpp b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
new file mode 100644
index 0000000..e3392ea
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+
+#include <stout/path.hpp>
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace paths {
+
+string getStagingDir(const string& storeDir)
+{
+  return path::join(storeDir, "staging");
+}
+
+
+string getStagingTempDir(const string& storeDir)
+{
+  return path::join(getStagingDir(storeDir), "XXXXXX");
+}
+
+
+string getImageArchiveTarPath(
+    const string& discoveryDir,
+    const string& name)
+{
+  return path::join(discoveryDir, name + ".tar");
+}
+
+
+string getImageArchiveRepositoriesPath(const string& archivePath)
+{
+  return path::join(archivePath, "repositories");
+}
+
+
+std::string getImageArchiveLayerPath(
+    const string& archivePath,
+    const string& layerId)
+{
+  return path::join(archivePath, layerId);
+}
+
+
+string getImageArchiveLayerManifestPath(
+    const string& archivePath,
+    const string& layerId)
+{
+  return path::join(getImageArchiveLayerPath(archivePath, layerId), "json");
+}
+
+
+string getImageArchiveLayerTarPath(
+  const string& archivePath,
+  const string& layerId)
+{
+  return path::join(
+      getImageArchiveLayerPath(archivePath, layerId), "layer.tar");
+}
+
+
+string getImageArchiveLayerRootfsPath(
+    const string& archivePath,
+    const string& layerId)
+{
+  return path::join(getImageArchiveLayerPath(archivePath, layerId), "rootfs");
+}
+
+
+string getImageLayerPath(
+    const string& storeDir,
+    const string& layerId)
+{
+  return path::join(storeDir, "layers", layerId);
+}
+
+
+string getImageLayerRootfsPath(
+    const string& storeDir,
+    const string& layerId)
+{
+  return path::join(getImageLayerPath(storeDir, layerId), "rootfs");
+}
+
+
+string getStoredImagesPath(const string& storeDir)
+{
+  return path::join(storeDir, "storedImages");
+}
+
+} // namespace paths {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.hpp b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
new file mode 100644
index 0000000..18beb2e
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_PATHS_HPP__
+#define __PROVISIONER_DOCKER_PATHS_HPP__
+
+#include <list>
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace paths {
+
+/**
+ * The Docker store file system layout is as follows:
+ * Image store dir ('--docker_store_dir' slave flag)
+ *    |--staging
+ *       |-- <temp_dir_archive>
+ *           |-- <layer_id>
+ *               |-- rootfs
+ *    |--layers
+ *       |--<layer_id>
+ *           |--rootfs
+ *    |--storedImages (file holding on cached images)
+ */
+
+std::string getStagingDir(const std::string& storeDir);
+
+
+std::string getStagingTempDir(const std::string& storeDir);
+
+
+std::string getImageArchiveTarPath(
+    const std::string& discoveryDir,
+    const std::string& name);
+
+
+std::string getImageArchiveRepositoriesPath(const std::string& archivePath);
+
+
+std::string getImageArchiveLayerPath(
+    const std::string& archivePath,
+    const std::string& layerId);
+
+
+std::string getImageArchiveLayerManifestPath(
+    const std::string& archivePath,
+    const std::string& layerId);
+
+
+std::string getImageArchiveLayerTarPath(
+  const std::string& archivePath,
+  const std::string& layerId);
+
+
+std::string getImageArchiveLayerRootfsPath(
+  const std::string& archivePath,
+  const std::string& layerId);
+
+
+std::string getImageLayerPath(
+    const std::string& storeDir,
+    const std::string& layerId);
+
+
+std::string getImageLayerRootfsPath(
+    const std::string& storeDir,
+    const std::string& layerId);
+
+
+std::string getStoredImagesPath(const std::string& storeDir);
+
+} // namespace paths {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_PATHS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
new file mode 100644
index 0000000..f61f9e5
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
+
+using std::string;
+
+using process::Owned;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+Try<Owned<Puller>> Puller::create(const Flags& flags)
+{
+  const string puller = flags.docker_puller;
+
+  if (puller == "local") {
+    return Owned<Puller>(new LocalPuller(flags));
+  }
+
+  return Error("Unknown or unsupported docker puller: " + puller);
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
new file mode 100644
index 0000000..8010b8a
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_PULLER_HPP__
+#define __PROVISIONER_DOCKER_PULLER_HPP__
+
+#include <list>
+#include <utility>
+
+#include <stout/try.hpp>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class Puller
+{
+public:
+  static Try<process::Owned<Puller>> create(const Flags& flags);
+
+  virtual ~Puller() {}
+
+  /**
+   * Pull a Docker image layers into the specified directory, and
+   * return the list of layer ids in that image in the right
+   * dependency order, and also return the directory where
+   * the puller puts its changeset.
+   *
+   * @param name The name of the image.
+   * @param directory The target directory to store the layers.
+   * @return list of layers maped to its local directory ordered by its
+   *         dependency.
+   */
+  virtual process::Future<std::list<std::pair<std::string, std::string>>> pull(
+      const docker::Image::Name& name,
+      const std::string& directory) = 0;
+};
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
+#endif // __PROVISIONER_DOCKER_PULLER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
new file mode 100644
index 0000000..5a01f1b
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/http.hpp>
+#include <process/io.hpp>
+
+#include <stout/os.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/token_manager.hpp"
+
+using std::string;
+using std::vector;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+
+using process::http::Request;
+using process::http::Response;
+using process::http::URL;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+using FileSystemLayerInfo = RegistryClient::FileSystemLayerInfo;
+
+using ManifestResponse = RegistryClient::ManifestResponse;
+
+const Duration RegistryClient::DEFAULT_MANIFEST_TIMEOUT_SECS = Seconds(10);
+
+const size_t RegistryClient::DEFAULT_MANIFEST_MAXSIZE_BYTES = 4096;
+
+static const uint16_t DEFAULT_SSL_PORT = 443;
+
+class RegistryClientProcess : public Process<RegistryClientProcess>
+{
+public:
+  static Try<Owned<RegistryClientProcess>> create(
+      const URL& registry,
+      const URL& authServer,
+      const Option<RegistryClient::Credentials>& creds);
+
+  Future<RegistryClient::ManifestResponse> getManifest(
+      const string& path,
+      const Option<string>& tag,
+      const Duration& timeout);
+
+  Future<size_t> getBlob(
+      const string& path,
+      const Option<string>& digest,
+      const Path& filePath,
+      const Duration& timeout,
+      size_t maxSize);
+
+private:
+  RegistryClientProcess(
+    const URL& registryServer,
+    const Owned<TokenManager>& tokenManager,
+    const Option<RegistryClient::Credentials>& creds);
+
+  Future<Response> doHttpGet(
+      const URL& url,
+      const Option<process::http::Headers>& headers,
+      const Duration& timeout,
+      bool resend,
+      const Option<string>& lastResponse) const;
+
+  Try<process::http::Headers> getAuthenticationAttributes(
+      const Response& httpResponse) const;
+
+  const URL registryServer_;
+  Owned<TokenManager> tokenManager_;
+  const Option<RegistryClient::Credentials> credentials_;
+
+  RegistryClientProcess(const RegistryClientProcess&) = delete;
+  RegistryClientProcess& operator = (const RegistryClientProcess&) = delete;
+};
+
+
+Try<Owned<RegistryClient>> RegistryClient::create(
+    const URL& registryServer,
+    const URL& authServer,
+    const Option<Credentials>& creds)
+{
+  Try<Owned<RegistryClientProcess>> process =
+    RegistryClientProcess::create(authServer, registryServer, creds);
+
+  if (process.isError()) {
+    return Error(process.error());
+  }
+
+  return Owned<RegistryClient>(
+      new RegistryClient(authServer, registryServer, creds, process.get()));
+}
+
+
+RegistryClient::RegistryClient(
+    const URL& registryServer,
+    const URL& authServer,
+    const Option<Credentials>& creds,
+    const Owned<RegistryClientProcess>& process)
+  : registryServer_(registryServer),
+    authServer_(authServer),
+    credentials_(creds),
+    process_(process)
+{
+  spawn(CHECK_NOTNULL(process_.get()));
+}
+
+
+RegistryClient::~RegistryClient()
+{
+  terminate(process_.get());
+  process::wait(process_.get());
+}
+
+
+Future<ManifestResponse> RegistryClient::getManifest(
+    const string& _path,
+    const Option<string>& _tag,
+    const Option<Duration>& _timeout)
+{
+  Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+
+  return dispatch(
+      process_.get(),
+      &RegistryClientProcess::getManifest,
+      _path,
+      _tag,
+      timeout);
+}
+
+
+Future<size_t> RegistryClient::getBlob(
+    const string& _path,
+    const Option<string>& _digest,
+    const Path& _filePath,
+    const Option<Duration>& _timeout,
+    const Option<size_t>& _maxSize)
+{
+  Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+  size_t maxSize = _maxSize.getOrElse(DEFAULT_MANIFEST_MAXSIZE_BYTES);
+
+  return dispatch(
+        process_.get(),
+        &RegistryClientProcess::getBlob,
+        _path,
+        _digest,
+        _filePath,
+        timeout,
+        maxSize);
+}
+
+
+Try<Owned<RegistryClientProcess>> RegistryClientProcess::create(
+    const URL& registryServer,
+    const URL& authServer,
+    const Option<RegistryClient::Credentials>& creds)
+{
+  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(authServer);
+  if (tokenMgr.isError()) {
+    return Error("Failed to create token manager: " + tokenMgr.error());
+  }
+
+  return Owned<RegistryClientProcess>(
+      new RegistryClientProcess(registryServer, tokenMgr.get(), creds));
+}
+
+
+RegistryClientProcess::RegistryClientProcess(
+    const URL& registryServer,
+    const Owned<TokenManager>& tokenMgr,
+    const Option<RegistryClient::Credentials>& creds)
+  : registryServer_(registryServer),
+    tokenManager_(tokenMgr),
+    credentials_(creds) {}
+
+
+Try<process::http::Headers> RegistryClientProcess::getAuthenticationAttributes(
+    const Response& httpResponse) const
+{
+  if (httpResponse.headers.find("WWW-Authenticate") ==
+      httpResponse.headers.end()) {
+    return Error("Failed to find WWW-Authenticate header value");
+  }
+
+  const string& authString = httpResponse.headers.at("WWW-Authenticate");
+
+  const vector<string> authStringTokens = strings::tokenize(authString, " ");
+  if ((authStringTokens.size() != 2) || (authStringTokens[0] != "Bearer")) {
+    // TODO(jojy): Look at various possibilities of auth response. We currently
+    // assume that the string will have realm information.
+    return Error("Invalid authentication header value: " + authString);
+  }
+
+  const vector<string> authParams = strings::tokenize(authStringTokens[1], ",");
+
+  process::http::Headers authAttributes;
+  auto addAttribute = [&authAttributes](
+      const string& param) -> Try<Nothing> {
+    const vector<string> paramTokens =
+      strings::tokenize(param, "=\"");
+
+    if (paramTokens.size() != 2) {
+      return Error(
+          "Failed to get authentication attribute from response parameter " +
+          param);
+    }
+
+    authAttributes.insert({paramTokens[0], paramTokens[1]});
+
+    return Nothing();
+  };
+
+  foreach (const string& param, authParams) {
+    Try<Nothing> addRes = addAttribute(param);
+    if (addRes.isError()) {
+      return Error(addRes.error());
+    }
+  }
+
+  return authAttributes;
+}
+
+
+Future<Response> RegistryClientProcess::doHttpGet(
+    const URL& url,
+    const Option<process::http::Headers>& headers,
+    const Duration& timeout,
+    bool resend,
+    const Option<string>& lastResponseStatus) const
+{
+  return process::http::get(url, headers)
+    .after(timeout, [](
+        const Future<Response>& httpResponseFuture) -> Future<Response> {
+      return Failure("Response timeout");
+    })
+    .then(defer(self(), [=](
+        const Response& httpResponse) -> Future<Response> {
+      VLOG(1) << "Response status: " + httpResponse.status;
+
+      // Set the future if we get a OK response.
+      if (httpResponse.status == "200 OK") {
+        return httpResponse;
+      } else if (httpResponse.status == "400 Bad Request") {
+        Try<JSON::Object> errorResponse =
+          JSON::parse<JSON::Object>(httpResponse.body);
+
+        if (errorResponse.isError()) {
+          return Failure("Failed to parse bad request response JSON: " +
+                         errorResponse.error());
+        }
+
+        std::ostringstream out;
+        bool first = true;
+        Result<JSON::Array> errorObjects =
+          errorResponse.get().find<JSON::Array>("errors");
+
+        if (errorObjects.isError()) {
+          return Failure("Failed to find 'errors' in bad request response: " +
+                         errorObjects.error());
+        } else if (errorObjects.isNone()) {
+          return Failure("Errors not found in bad request response");
+        }
+
+        foreach (const JSON::Value& error, errorObjects.get().values) {
+          Result<JSON::String> message =
+            error.as<JSON::Object>().find<JSON::String>("message");
+
+          if (message.isError()) {
+            return Failure("Failed to parse bad request error message: " +
+                           message.error());
+          } else if (message.isNone()) {
+            continue;
+          }
+
+          if (first) {
+            out << message.get().value;
+            first = false;
+          } else {
+            out << ", " << message.get().value;
+          }
+        }
+
+        return Failure("Received Bad request, errors: [" + out.str() + "]");
+      }
+
+      // Prevent infinite recursion.
+      if (lastResponseStatus.isSome() &&
+          (lastResponseStatus.get() == httpResponse.status)) {
+        return Failure("Invalid response: " + httpResponse.status);
+      }
+
+      // If resend is not set, we dont try again and stop here.
+      if (!resend) {
+        return Failure("Bad response: " + httpResponse.status);
+      }
+
+      // Handle 401 Unauthorized.
+      if (httpResponse.status == "401 Unauthorized") {
+        Try<process::http::Headers> authAttributes =
+          getAuthenticationAttributes(httpResponse);
+
+        if (authAttributes.isError()) {
+          return Failure(
+              "Failed to get authentication attributes: " +
+              authAttributes.error());
+        }
+
+        // TODO(jojy): Currently only handling TLS/cert authentication.
+        Future<Token> tokenResponse = tokenManager_->getToken(
+          authAttributes.get().at("service"),
+          authAttributes.get().at("scope"),
+          None());
+
+        return tokenResponse
+          .after(timeout, [=](
+              Future<Token> tokenResponse) -> Future<Token> {
+            tokenResponse.discard();
+            return Failure("Token response timeout");
+          })
+          .then(defer(self(), [=](
+              const Future<Token>& tokenResponse) {
+            // Send request with acquired token.
+            process::http::Headers authHeaders = {
+              {"Authorization", "Bearer " + tokenResponse.get().raw}
+            };
+
+            return doHttpGet(
+                url,
+                authHeaders,
+                timeout,
+                true,
+                httpResponse.status);
+        }));
+      } else if (httpResponse.status == "307 Temporary Redirect") {
+        // Handle redirect.
+
+        // TODO(jojy): Add redirect functionality in http::get.
+
+        auto toURL = [](
+            const string& urlString) -> Try<URL> {
+          // TODO(jojy): Need to add functionality to URL class that parses a
+          // string to its URL components. For now, assuming:
+          //  - scheme is https
+          //  - path always ends with /
+
+          static const string schemePrefix = "https://";
+
+          if (!strings::contains(urlString, schemePrefix)) {
+            return Error(
+                "Failed to find expected token '" + schemePrefix +
+                "' in redirect url");
+          }
+
+          const string schemeSuffix = urlString.substr(schemePrefix.length());
+
+          const vector<string> components =
+            strings::tokenize(schemeSuffix, "/");
+
+          const string path = schemeSuffix.substr(components[0].length());
+
+          const vector<string> addrComponents =
+            strings::tokenize(components[0], ":");
+
+          uint16_t port = DEFAULT_SSL_PORT;
+          string domain = components[0];
+
+          // Parse the port.
+          if (addrComponents.size() == 2) {
+            domain = addrComponents[0];
+
+            Try<uint16_t> tryPort = numify<uint16_t>(addrComponents[1]);
+            if (tryPort.isError()) {
+              return Error(
+                  "Failed to parse location: " + urlString + " for port.");
+            }
+
+            port = tryPort.get();
+          }
+
+          return URL("https", domain, port, path);
+        };
+
+        if (httpResponse.headers.find("Location") ==
+            httpResponse.headers.end()) {
+          return Failure(
+              "Invalid redirect response: 'Location' not found in headers.");
+        }
+
+        const string& location = httpResponse.headers.at("Location");
+        Try<URL> tryUrl = toURL(location);
+        if (tryUrl.isError()) {
+          return Failure(
+              "Failed to parse '" + location + "': " + tryUrl.error());
+        }
+
+        return doHttpGet(
+            tryUrl.get(),
+            headers,
+            timeout,
+            false,
+            httpResponse.status);
+      } else {
+        return Failure("Invalid response: " + httpResponse.status);
+      }
+    }));
+}
+
+
+Future<ManifestResponse> RegistryClientProcess::getManifest(
+    const string& path,
+    const Option<string>& tag,
+    const Duration& timeout)
+{
+  if (strings::contains(path, " ")) {
+    return Failure("Invalid repository path: " + path);
+  }
+
+  string repoTag = tag.getOrElse("latest");
+  if (strings::contains(repoTag, " ")) {
+    return Failure("Invalid repository tag: " + repoTag);
+  }
+
+  URL manifestURL(registryServer_);
+  manifestURL.path =
+    "v2/" + path + "/manifests/" + repoTag;
+
+  auto getManifestResponse = [](
+      const Response& httpResponse) -> Try<ManifestResponse> {
+    if (!httpResponse.headers.contains("Docker-Content-Digest")) {
+      return Error("Docker-Content-Digest header missing in response");
+    }
+
+    Try<JSON::Object> responseJSON =
+      JSON::parse<JSON::Object>(httpResponse.body);
+
+    if (responseJSON.isError()) {
+      return Error(responseJSON.error());
+    }
+
+    Result<JSON::String> name = responseJSON.get().find<JSON::String>("name");
+    if (name.isNone()) {
+      return Error("Failed to find \"name\" in manifest response");
+    }
+
+    Result<JSON::Array> fsLayers =
+      responseJSON.get().find<JSON::Array>("fsLayers");
+
+    if (fsLayers.isNone()) {
+      return Error("Failed to find \"fsLayers\" in manifest response");
+    }
+
+    Result<JSON::Array> historyArray =
+      responseJSON.get().find<JSON::Array>("history");
+
+    if (historyArray.isNone()) {
+      return Error("Failed to find \"history\" in manifest response");
+    }
+
+    if (historyArray.get().values.size() != fsLayers.get().values.size()) {
+      return Error(
+          "\"history\" and \"fsLayers\" array count mismatch"
+          "in manifest response");
+    }
+
+    vector<FileSystemLayerInfo> fsLayerInfoList;
+    size_t index = 0;
+
+    foreach (const JSON::Value& layer, fsLayers.get().values) {
+      if (!layer.is<JSON::Object>()) {
+        return Error(
+            "Failed to parse layer as a JSON object for index: " +
+            stringify(index));
+      }
+
+      const JSON::Object& layerInfoJSON = layer.as<JSON::Object>();
+
+      // Get blobsum for layer.
+      const Result<JSON::String> blobSumInfo =
+        layerInfoJSON.find<JSON::String>("blobSum");
+
+      if (blobSumInfo.isNone()) {
+        return Error("Failed to find \"blobSum\" in manifest response");
+      }
+
+      // Get history for layer.
+      if (!historyArray.get().values[index].is<JSON::Object>()) {
+        return Error(
+            "Failed to parse history as a JSON object for index: " +
+            stringify(index));
+      }
+      const JSON::Object& historyObj =
+        historyArray.get().values[index].as<JSON::Object>();
+
+      // Get layer id.
+      const Result<JSON::String> v1CompatibilityJSON =
+        historyObj.find<JSON::String>("v1Compatibility");
+
+      if (!v1CompatibilityJSON.isSome()) {
+        return Error(
+            "Failed to obtain layer v1 compability json in manifest for layer: "
+            + stringify(index));
+      }
+
+      Try<JSON::Object> v1CompatibilityObj =
+        JSON::parse<JSON::Object>(v1CompatibilityJSON.get().value);
+
+      if (!v1CompatibilityObj.isSome()) {
+        return Error(
+            "Failed to parse v1 compability json in manifest for layer: "
+            + stringify(index));
+      }
+
+      const Result<JSON::String> id =
+        v1CompatibilityObj.get().find<JSON::String>("id");
+
+      if (!id.isSome()) {
+        return Error(
+            "Failed to find \"id\" in manifest for layer: " + stringify(index));
+      }
+
+      fsLayerInfoList.emplace_back(
+          FileSystemLayerInfo{
+            blobSumInfo.get().value,
+            id.get().value,
+          });
+
+      index++;
+    }
+
+    return ManifestResponse {
+      name.get().value,
+      httpResponse.headers.at("Docker-Content-Digest"),
+      fsLayerInfoList,
+    };
+  };
+
+  return doHttpGet(manifestURL, None(), timeout, true, None())
+    .then([getManifestResponse] (
+        const Response& response) -> Future<ManifestResponse> {
+      Try<ManifestResponse> manifestResponse = getManifestResponse(response);
+
+      if (manifestResponse.isError()) {
+        return Failure(
+            "Failed to parse manifest response: " + manifestResponse.error());
+      }
+
+      return manifestResponse.get();
+    });
+}
+
+
+Future<size_t> RegistryClientProcess::getBlob(
+    const string& path,
+    const Option<string>& digest,
+    const Path& filePath,
+    const Duration& timeout,
+    size_t maxSize)
+{
+  auto prepare = ([&filePath]() -> Try<Nothing> {
+      const string dirName = filePath.dirname();
+
+      // TODO(jojy): Return more state, for example - if the directory is new.
+      Try<Nothing> dirResult = os::mkdir(dirName, true);
+      if (dirResult.isError()) {
+        return Error(
+            "Failed to create directory to download blob: " +
+            dirResult.error());
+      }
+
+      return dirResult;
+  })();
+
+  // TODO(jojy): This currently leaves a residue in failure cases. Would be
+  // ideal if we can completely rollback.
+  if (prepare.isError()) {
+     return Failure(prepare.error());
+  }
+
+  if (strings::contains(path, " ")) {
+    return Failure("Invalid repository path: " + path);
+  }
+
+  URL blobURL(registryServer_);
+  blobURL.path =
+    "v2/" + path + "/blobs/" + digest.getOrElse("");
+
+  auto saveBlob = [filePath](
+      const Response& httpResponse) -> Future<size_t> {
+    // TODO(jojy): Add verification step.
+    // TODO(jojy): Add check for max size.
+    size_t size = httpResponse.body.length();
+    Try<int> fd = os::open(
+        filePath.value,
+        O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
+        S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+    if (fd.isError()) {
+      return Failure("Failed to open file '" + filePath.value + "': " +
+                     fd.error());
+    }
+
+    return process::io::write(fd.get(), httpResponse.body)
+      .then([size](const Future<Nothing>&) { return size; })
+      .onAny([fd]() { os::close(fd.get()); } );
+  };
+
+  return doHttpGet(blobURL, None(), timeout, true, None())
+    .then([saveBlob](const Response& response) { return saveBlob(response); });
+}
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp
new file mode 100644
index 0000000..1d3377e
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__
+#define __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__
+
+#include <string>
+#include <vector>
+
+#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/path.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+// Forward declarations.
+class RegistryClientProcess;
+
+
+class RegistryClient
+{
+public:
+  /**
+   * Encapsulates information about a file system layer.
+   */
+  struct FileSystemLayerInfo {
+    // TODO(jojy): This string includes the checksum type also now. Need to
+    // separate this into checksum method and checksum.
+    const std::string checksumInfo;
+    const std::string layerId;
+  };
+
+  /**
+   * Encapsulates response of "GET Manifest" request.
+   *
+   * Reference: https://docs.docker.com/registry/spec/api
+   */
+  struct ManifestResponse {
+    const std::string name;
+    const std::string digest;
+    const std::vector<FileSystemLayerInfo> fsLayerInfoList;
+  };
+
+  /**
+   * Encapsulates auth credentials for the client sessions.
+   * TODO(jojy): Secure heap to protect the credentials.
+   */
+  struct Credentials {
+    /**
+     * UserId for basic authentication.
+     */
+    const Option<std::string> userId;
+    /**
+     * Password for basic authentication.
+     */
+    const Option<std::string> password;
+    /**
+     * Account for fetching data from registry.
+     */
+    const Option<std::string> account;
+  };
+
+  /**
+   * Factory method for creating RegistryClient objects.
+   *
+   * @param registryServer URL of docker registry server.
+   * @param authServer URL of authorization server.
+   * @param credentials credentials for client session (optional).
+   * @return RegistryClient on Success.
+   *         Error on failure.
+   */
+  static Try<process::Owned<RegistryClient>> create(
+      const process::http::URL& registryServer,
+      const process::http::URL& authServer,
+      const Option<Credentials>& credentials);
+
+  /**
+   * Fetches manifest for a repository from the client's remote registry server.
+   *
+   * @param path path of the repository on the registry.
+   * @param tag unique tag that identifies the repository. Will default to
+   *    latest.
+   * @param timeout Maximum time ater which the request will timeout and return
+   *    a failure. Will default to RESPONSE_TIMEOUT.
+   * @return JSON object on success.
+   *         Failure on process failure.
+   */
+  process::Future<ManifestResponse> getManifest(
+      const std::string& path,
+      const Option<std::string>& tag,
+      const Option<Duration>& timeout);
+
+  /**
+   * Fetches blob for a repository from the client's remote registry server.
+   *
+   * @param path path of the repository on the registry.
+   * @param digest digest of the blob (from manifest).
+   * @param filePath file path to store the fetched blob.
+   * @param timeout Maximum time ater which the request will timeout and return
+   *    a failure. Will default to RESPONSE_TIMEOUT.
+   * @param maxSize Maximum size of the response thats acceptable. Will default
+   *    to MAX_RESPONSE_SIZE.
+   * @return size of downloaded blob on success.
+   *         Failure in case of any errors.
+   */
+  process::Future<size_t> getBlob(
+      const std::string& path,
+      const Option<std::string>& digest,
+      const Path& filePath,
+      const Option<Duration>& timeout,
+      const Option<size_t>& maxSize);
+
+  ~RegistryClient();
+
+private:
+  RegistryClient(
+    const process::http::URL& registryServer,
+    const process::http::URL& authServer,
+    const Option<Credentials>& credentials,
+    const process::Owned<RegistryClientProcess>& process);
+
+  static const Duration DEFAULT_MANIFEST_TIMEOUT_SECS;
+  static const size_t DEFAULT_MANIFEST_MAXSIZE_BYTES;
+
+  const process::http::URL registryServer_;
+  const process::http::URL authServer_;
+  const Option<Credentials> credentials_;
+  process::Owned<RegistryClientProcess> process_;
+
+  RegistryClient(const RegistryClient&) = delete;
+  RegistryClient& operator=(const RegistryClient&) = delete;
+};
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/spec.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/spec.cpp b/src/slave/containerizer/mesos/provisioner/docker/spec.cpp
new file mode 100644
index 0000000..2703b5d
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/spec.cpp
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/foreach.hpp>
+#include <stout/json.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/strings.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/spec.hpp"
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace spec {
+
+// Validate if the specified image manifest conforms to the Docker spec.
+Option<Error> validateManifest(const DockerImageManifest& manifest)
+{
+  // Validate required fields are present,
+  // e.g., repeated fields that has to be >= 1.
+  if (manifest.fslayers_size() <= 0) {
+    return Error("FsLayers field must have at least one blobSum");
+  }
+
+  if (manifest.history_size() <= 0) {
+    return Error("History field must have at least one v1Compatibility");
+  }
+
+  if (manifest.signatures_size() <= 0) {
+    return Error("Signatures field must have at least one signature");
+  }
+
+  // Verify that blobSum and v1Compatibility numbers are equal.
+  if (manifest.fslayers_size() != manifest.history_size()) {
+    return Error("Size of blobSum and v1Compatibility must be equal");
+  }
+
+  // FsLayers field validation.
+  foreach (const docker::DockerImageManifest::FsLayers& fslayer,
+           manifest.fslayers()) {
+    const string& blobSum = fslayer.blobsum();
+    if (!strings::contains(blobSum, ":")) {
+      return Error("Incorrect blobSum format");
+    }
+  }
+
+  return None();
+}
+
+
+Try<docker::DockerImageManifest> parse(const JSON::Object& json)
+{
+  Try<docker::DockerImageManifest> manifest =
+    protobuf::parse<docker::DockerImageManifest>(json);
+
+  if (manifest.isError()) {
+    return Error("Protobuf parse failed: " + manifest.error());
+  }
+
+  Option<Error> error = validateManifest(manifest.get());
+  if (error.isSome()) {
+    return Error("Docker Image Manifest Validation failed: " +
+                 error.get().message);
+  }
+
+  return manifest.get();
+}
+
+} // namespace spec {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/spec.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/spec.hpp b/src/slave/containerizer/mesos/provisioner/docker/spec.hpp
new file mode 100644
index 0000000..96e8d6d
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/spec.hpp
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_SPEC_HPP__
+#define __PROVISIONER_DOCKER_SPEC_HPP__
+
+#include <stout/error.hpp>
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+
+#include <mesos/mesos.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace spec {
+
+// Validate if the specified image manifest conforms to the Docker spec.
+Option<Error> validateManifest(const docker::DockerImageManifest& manifest);
+
+// TODO(Gilbert): add validations here, e.g., Manifest, Blob, Layout, ImageID.
+
+// Parse the DockerImageManifest from the specified JSON object.
+Try<docker::DockerImageManifest> parse(const JSON::Object& json);
+
+} // namespace spec {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_SPEC_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/store.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
new file mode 100644
index 0000000..bb02d65
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
+
+#include <list>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/json.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/subprocess.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+
+#include "slave/flags.hpp"
+
+using namespace process;
+
+using std::list;
+using std::pair;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class StoreProcess : public Process<StoreProcess>
+{
+public:
+  StoreProcess(
+      const Flags& _flags,
+      const Owned<MetadataManager>& _metadataManager,
+      const Owned<Puller>& _puller)
+    : flags(_flags), metadataManager(_metadataManager), puller(_puller) {}
+
+  ~StoreProcess() {}
+
+  Future<Nothing> recover();
+
+  Future<vector<string>> get(const mesos::Image& image);
+
+private:
+  Future<Image> _get(
+      const Image::Name& name,
+      const Option<Image>& image);
+
+  Future<vector<string>> __get(const Image& image);
+
+  Future<vector<string>> moveLayers(
+      const std::string& staging,
+      const std::list<pair<string, string>>& layerPaths);
+
+  Future<Image> storeImage(
+      const Image::Name& name,
+      const std::vector<std::string>& layerIds);
+
+  Future<Nothing> moveLayer(const pair<string, string>& layerPath);
+
+  const Flags flags;
+  Owned<MetadataManager> metadataManager;
+  Owned<Puller> puller;
+};
+
+
+Try<Owned<slave::Store>> Store::create(const Flags& flags)
+{
+  Try<Owned<Puller>> puller = Puller::create(flags);
+  if (puller.isError()) {
+    return Error("Failed to create Docker puller: " + puller.error());
+  }
+
+  if (!os::exists(flags.docker_store_dir)) {
+    Try<Nothing> mkdir = os::mkdir(flags.docker_store_dir);
+    if (mkdir.isError()) {
+      return Error("Failed to create Docker store directory: " + mkdir.error());
+    }
+  }
+
+  if (!os::exists(paths::getStagingDir(flags.docker_store_dir))) {
+    Try<Nothing> mkdir =
+      os::mkdir(paths::getStagingDir(flags.docker_store_dir));
+
+    if (mkdir.isError()) {
+      return Error("Failed to create Docker store staging directory: " +
+                   mkdir.error());
+    }
+  }
+
+  Try<Owned<MetadataManager>> metadataManager = MetadataManager::create(flags);
+  if (metadataManager.isError()) {
+    return Error(metadataManager.error());
+  }
+
+  Owned<StoreProcess> process(
+      new StoreProcess(flags, metadataManager.get(), puller.get()));
+
+  return Owned<slave::Store>(new Store(process));
+}
+
+
+Store::Store(const Owned<StoreProcess>& _process) : process(_process)
+{
+  process::spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+Store::~Store()
+{
+  process::terminate(process.get());
+  process::wait(process.get());
+}
+
+
+Future<Nothing> Store::recover()
+{
+  return dispatch(process.get(), &StoreProcess::recover);
+}
+
+
+Future<vector<string>> Store::get(const mesos::Image& image)
+{
+  return dispatch(process.get(), &StoreProcess::get, image);
+}
+
+
+Future<vector<string>> StoreProcess::get(const mesos::Image& image)
+{
+  if (image.type() != mesos::Image::DOCKER) {
+    return Failure("Docker provisioner store only supports Docker images");
+  }
+
+  Image::Name imageName = parseImageName(image.docker().name());
+
+  return metadataManager->get(imageName)
+    .then(defer(self(), &Self::_get, imageName, lambda::_1))
+    .then(defer(self(), &Self::__get, lambda::_1));
+}
+
+
+Future<Image> StoreProcess::_get(
+    const Image::Name& name,
+    const Option<Image>& image)
+{
+  if (image.isSome()) {
+    return image.get();
+  }
+
+  Try<string> staging =
+    os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir));
+
+  if (staging.isError()) {
+    return Failure("Failed to create a staging directory");
+  }
+
+  return puller->pull(name, staging.get())
+    .then(defer(self(), &Self::moveLayers, staging.get(), lambda::_1))
+    .then(defer(self(), &Self::storeImage, name, lambda::_1))
+    .onAny([staging]() {
+      Try<Nothing> rmdir = os::rmdir(staging.get());
+      if (rmdir.isError()) {
+        LOG(WARNING) << "Failed to remove staging directory: " << rmdir.error();
+      }
+    });
+}
+
+
+Future<vector<string>> StoreProcess::__get(const Image& image)
+{
+  vector<string> layerDirectories;
+  foreach (const string& layer, image.layer_ids()) {
+    layerDirectories.push_back(
+        paths::getImageLayerRootfsPath(
+            flags.docker_store_dir, layer));
+  }
+
+  return layerDirectories;
+}
+
+
+Future<Nothing> StoreProcess::recover()
+{
+  return metadataManager->recover();
+}
+
+
+Future<vector<string>> StoreProcess::moveLayers(
+    const string& staging,
+    const list<pair<string, string>>& layerPaths)
+{
+  list<Future<Nothing>> futures;
+  foreach (const auto& layerPath, layerPaths) {
+    futures.push_back(moveLayer(layerPath));
+  }
+
+  return collect(futures)
+    .then([layerPaths]() {
+        vector<string> layerIds;
+        foreach (const auto& layerPath, layerPaths) {
+          layerIds.push_back(layerPath.first);
+        }
+
+        return layerIds;
+    });
+}
+
+
+Future<Image> StoreProcess::storeImage(
+    const Image::Name& name,
+    const vector<string>& layerIds)
+{
+  return metadataManager->put(name, layerIds);
+}
+
+
+Future<Nothing> StoreProcess::moveLayer(const pair<string, string>& layerPath)
+{
+  if (!os::exists(layerPath.second)) {
+    return Failure("Unable to find layer '" + layerPath.first + "' in '" +
+                   layerPath.second + "'");
+  }
+
+  const string imageLayerPath =
+    paths::getImageLayerPath(flags.docker_store_dir, layerPath.first);
+
+  if (!os::exists(imageLayerPath)) {
+    Try<Nothing> mkdir = os::mkdir(imageLayerPath);
+    if (mkdir.isError()) {
+      return Failure("Failed to create layer path in store for id '" +
+                     layerPath.first + "': " + mkdir.error());
+    }
+  }
+
+  Try<Nothing> status = os::rename(
+      layerPath.second,
+      paths::getImageLayerRootfsPath(
+          flags.docker_store_dir, layerPath.first));
+
+  if (status.isError()) {
+    return Failure("Failed to move layer '" + layerPath.first +
+                   "' to store directory: " + status.error());
+  }
+
+  return Nothing();
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/store.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.hpp b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
new file mode 100644
index 0000000..95e46b9
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_STORE_HPP__
+#define __PROVISIONER_DOCKER_STORE_HPP__
+
+#include <string>
+
+#include <stout/try.hpp>
+
+#include <process/future.hpp>
+
+#include "slave/containerizer/mesos/provisioner/store.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Forward Declarations.
+class Puller;
+class StoreProcess;
+
+
+// Store fetches the Docker images and stores them on disk.
+class Store : public slave::Store
+{
+public:
+  static Try<process::Owned<slave::Store>> create(const Flags& flags);
+
+  ~Store();
+
+  process::Future<Nothing> recover();
+
+  process::Future<std::vector<std::string>> get(const mesos::Image& image);
+
+private:
+  explicit Store(const process::Owned<StoreProcess>& _process);
+
+  Store& operator=(const Store&) = delete; // Not assignable.
+  Store(const Store&) = delete; // Not copyable.
+
+  process::Owned<StoreProcess> process;
+};
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_STORE_HPP__