You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/10/26 19:40:28 UTC
[04/12] mesos git commit: Relocated MesosContainerizer specific files
to the correct location.
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
new file mode 100644
index 0000000..f314f20
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <list>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/json.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/subprocess.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
+
+using namespace process;
+
+using std::list;
+using std::pair;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class LocalPullerProcess : public process::Process<LocalPullerProcess>
+{
+public:
+ LocalPullerProcess(const Flags& _flags) : flags(_flags) {}
+
+ ~LocalPullerProcess() {}
+
+ process::Future<list<pair<string, string>>> pull(
+ const Image::Name& name,
+ const string& directory);
+
+private:
+ process::Future<Nothing> untarImage(
+ const std::string& tarPath,
+ const std::string& directory);
+
+ process::Future<list<pair<string, string>>> putImage(
+ const Image::Name& name,
+ const std::string& directory);
+
+ process::Future<list<pair<string, string>>> putLayers(
+ const std::string& directory,
+ const std::vector<std::string>& layerIds);
+
+ process::Future<pair<string, string>> putLayer(
+ const std::string& directory,
+ const std::string& layerId);
+
+ const Flags flags;
+};
+
+
+LocalPuller::LocalPuller(const Flags& flags)
+{
+ process = Owned<LocalPullerProcess>(new LocalPullerProcess(flags));
+ process::spawn(process.get());
+}
+
+
+LocalPuller::~LocalPuller()
+{
+ process::terminate(process.get());
+ process::wait(process.get());
+}
+
+
+Future<list<pair<string, string>>> LocalPuller::pull(
+ const Image::Name& name,
+ const string& directory)
+{
+ return dispatch(process.get(), &LocalPullerProcess::pull, name, directory);
+}
+
+
+Future<list<pair<string, string>>> LocalPullerProcess::pull(
+ const Image::Name& name,
+ const string& directory)
+{
+ const string tarPath = paths::getImageArchiveTarPath(
+ flags.docker_local_archives_dir,
+ stringify(name));
+
+ if (!os::exists(tarPath)) {
+ return Failure("Failed to find archive for image '" + stringify(name) +
+ "' at '" + tarPath + "'");
+ }
+
+ return untarImage(tarPath, directory)
+ .then(defer(self(), &Self::putImage, name, directory));
+}
+
+
+Future<Nothing> LocalPullerProcess::untarImage(
+ const string& tarPath,
+ const string& directory)
+{
+ VLOG(1) << "Untarring image from '" << tarPath
+ << "' to '" << directory << "'";
+
+ // Untar store_discovery_local_dir/name.tar into directory/.
+ // TODO(tnachen): Terminate tar process when slave exits.
+ const vector<string> argv = {
+ "tar",
+ "-C",
+ directory,
+ "-x",
+ "-f",
+ tarPath
+ };
+
+ Try<Subprocess> s = subprocess(
+ "tar",
+ argv,
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PATH("/dev/null"));
+
+ if (s.isError()) {
+ return Failure("Failed to create tar subprocess: " + s.error());
+ }
+
+ return s.get().status()
+ .then([tarPath](const Option<int>& status) -> Future<Nothing> {
+ if (status.isNone()) {
+ return Failure("Failed to reap status for tar subprocess in " +
+ tarPath);
+ }
+ if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) {
+ return Failure("Untar image failed with exit code: " +
+ WSTRINGIFY(status.get()));
+ }
+
+ return Nothing();
+ });
+}
+
+
+static Result<string> getParentId(
+ const string& directory,
+ const string& layerId)
+{
+ Try<string> manifest =
+ os::read(paths::getImageArchiveLayerManifestPath(directory, layerId));
+
+ if (manifest.isError()) {
+ return Error("Failed to read manifest: " + manifest.error());
+ }
+
+ Try<JSON::Object> json = JSON::parse<JSON::Object>(manifest.get());
+ if (json.isError()) {
+ return Error("Failed to parse manifest: " + json.error());
+ }
+
+ Result<JSON::String> parentId = json.get().find<JSON::String>("parent");
+ if (parentId.isNone() || (parentId.isSome() && parentId.get() == "")) {
+ return None();
+ } else if (parentId.isError()) {
+ return Error("Failed to read parent of layer: " + parentId.error());
+ }
+
+ return parentId.get().value;
+}
+
+
+Future<list<pair<string, string>>> LocalPullerProcess::putImage(
+ const Image::Name& name,
+ const string& directory)
+{
+ Try<string> value =
+ os::read(paths::getImageArchiveRepositoriesPath(directory));
+
+ if (value.isError()) {
+ return Failure("Failed to read repository JSON: " + value.error());
+ }
+
+ Try<JSON::Object> json = JSON::parse<JSON::Object>(value.get());
+ if (json.isError()) {
+ return Failure("Failed to parse JSON: " + json.error());
+ }
+
+ Result<JSON::Object> repositoryValue =
+ json.get().find<JSON::Object>(name.repository());
+
+ if (repositoryValue.isError()) {
+ return Failure("Failed to find repository: " + repositoryValue.error());
+ } else if (repositoryValue.isNone()) {
+ return Failure("Repository '" + name.repository() + "' is not found");
+ }
+
+ const JSON::Object repositoryJson = repositoryValue.get();
+
+ // We don't use JSON find here because a tag might contain a '.'.
+ std::map<string, JSON::Value>::const_iterator entry =
+ repositoryJson.values.find(name.tag());
+
+ if (entry == repositoryJson.values.end()) {
+ return Failure("Tag '" + name.tag() + "' is not found");
+ } else if (!entry->second.is<JSON::String>()) {
+ return Failure("Tag JSON value expected to be JSON::String");
+ }
+
+ const string layerId = entry->second.as<JSON::String>().value;
+
+ Try<string> manifest =
+ os::read(paths::getImageArchiveLayerManifestPath(directory, layerId));
+
+ if (manifest.isError()) {
+ return Failure("Failed to read manifest: " + manifest.error());
+ }
+
+ Try<JSON::Object> manifestJson = JSON::parse<JSON::Object>(manifest.get());
+ if (manifestJson.isError()) {
+ return Failure("Failed to parse manifest: " + manifestJson.error());
+ }
+
+ vector<string> layerIds;
+ layerIds.push_back(layerId);
+ Result<string> parentId = getParentId(directory, layerId);
+ while (parentId.isSome()) {
+ layerIds.insert(layerIds.begin(), parentId.get());
+ parentId = getParentId(directory, parentId.get());
+ }
+
+ if (parentId.isError()) {
+ return Failure("Failed to find parent layer id of layer '" + layerId +
+ "': " + parentId.error());
+ }
+
+ return putLayers(directory, layerIds);
+}
+
+
+Future<list<pair<string, string>>> LocalPullerProcess::putLayers(
+ const string& directory,
+ const vector<string>& layerIds)
+{
+ list<Future<pair<string, string>>> futures;
+ foreach (const string& layerId, layerIds) {
+ futures.push_back(putLayer(directory, layerId));
+ }
+
+ return collect(futures);
+}
+
+
+Future<pair<string, string>> LocalPullerProcess::putLayer(
+ const string& directory,
+ const string& layerId)
+{
+ // We untar the layer from source into a directory, then move the
+ // layer into store. We do this instead of untarring directly to
+ // store to make sure we don't end up with partially untarred layer
+ // rootfs.
+
+ const string localRootfsPath =
+ paths::getImageArchiveLayerRootfsPath(directory, layerId);
+
+ // Image layer has been untarred but is not present in the store directory.
+ if (os::exists(localRootfsPath)) {
+ LOG(WARNING) << "Image layer '" << layerId << "' rootfs present at but not "
+ << "in store directory '" << localRootfsPath << "'. Removing "
+ << "staged rootfs and untarring layer again.";
+
+ Try<Nothing> rmdir = os::rmdir(localRootfsPath);
+ if (rmdir.isError()) {
+ return Failure("Failed to remove incomplete staged rootfs for layer '" +
+ layerId + "': " + rmdir.error());
+ }
+ }
+
+ Try<Nothing> mkdir = os::mkdir(localRootfsPath);
+ if (mkdir.isError()) {
+ return Failure("Failed to create rootfs path '" + localRootfsPath +
+ "': " + mkdir.error());
+ }
+
+ // Untar directory/id/layer.tar into directory/id/rootfs.
+ // The tar file will be removed when the staging directory is
+ // removed.
+ const vector<string> argv = {
+ "tar",
+ "-C",
+ localRootfsPath,
+ "-x",
+ "-f",
+ paths::getImageArchiveLayerTarPath(directory, layerId)
+ };
+
+ Try<Subprocess> s = subprocess(
+ "tar",
+ argv,
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PATH("/dev/null"));
+
+ if (s.isError()) {
+ return Failure("Failed to create tar subprocess: " + s.error());
+ }
+
+ return s.get().status()
+ .then([directory, layerId](
+ const Option<int>& status) -> Future<pair<string, string>> {
+ if (status.isNone()) {
+ return Failure("Failed to reap subprocess to untar image");
+ } else if (!WIFEXITED(status.get()) || WEXITSTATUS(status.get()) != 0) {
+ return Failure("Untar failed with exit code: " +
+ WSTRINGIFY(status.get()));
+ }
+
+ const string rootfsPath =
+ paths::getImageArchiveLayerRootfsPath(directory, layerId);
+
+ if (!os::exists(rootfsPath)) {
+ return Failure("Failed to find the rootfs path after extracting layer"
+ " '" + layerId + "'");
+ }
+
+ return pair<string, string>(layerId, rootfsPath);
+ });
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
new file mode 100644
index 0000000..87d8002
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
+#define __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
+
+#include "slave/containerizer/mesos/provisioner/store.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Forward declaration.
+class LocalPullerProcess;
+
+
+/**
+ * LocalPuller assumes Docker images are stored in a local directory
+ * (configured with flags.docker_local_archives_dir), with all the
+ * images saved as tars with file names in the form of <repo>:<tag>.tar.
+ */
+class LocalPuller : public Puller
+{
+public:
+ explicit LocalPuller(const Flags& flags);
+
+ ~LocalPuller();
+
+ process::Future<std::list<std::pair<std::string, std::string>>> pull(
+ const Image::Name& name,
+ const std::string& directory);
+
+private:
+ LocalPuller& operator=(const LocalPuller&) = delete; // Not assignable.
+ LocalPuller(const LocalPuller&) = delete; // Not copyable.
+
+ process::Owned<LocalPullerProcess> process;
+};
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/message.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/message.hpp b/src/slave/containerizer/mesos/provisioner/docker/message.hpp
new file mode 100644
index 0000000..bb5248c
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/message.hpp
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MESSAGES_DOCKER_PROVISIONER_HPP__
+#define __MESSAGES_DOCKER_PROVISIONER_HPP__
+
+#include <stout/strings.hpp>
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include "slave/containerizer/mesos/provisioner/docker/message.pb.h"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Docker expects the image to be specified on the command line as:
+// [REGISTRY_HOST[:REGISTRY_PORT]/]REPOSITORY[:TAG|@TYPE:DIGEST]
+//
+// This format is inherently ambiguous when dealing with repository
+// names that include forward slashes. To disambiguate, the docker
+// code looks for '.', or ':', or 'localhost' to decide if the
+// first component is a registry or a respository name. For more
+// detail, drill into the implementation of docker pull.
+//
+// TODO(bmahler): We currently store the digest as a tag, does
+// that makes sense?
+//
+// TODO(bmahler): Validate based on docker's validation logic
+// and return a Try here.
+inline Image::Name parseImageName(std::string s)
+{
+ Image::Name name;
+
+ // Extract the digest.
+ if (strings::contains(s, "@")) {
+ std::vector<std::string> split = strings::split(s, "@");
+
+ s = split[0];
+ name.set_tag(split[1]);
+ }
+
+ // Remove the tag. We need to watch out for a
+ // host:port registry, which also contains ':'.
+ if (strings::contains(s, ":")) {
+ std::vector<std::string> split = strings::split(s, ":");
+
+ // The tag must be the last component. If a slash is
+ // present there is a registry port and no tag.
+ if (!strings::contains(split.back(), "/")) {
+ name.set_tag(split.back());
+ split.pop_back();
+
+ s = strings::join(":", split);
+ }
+ }
+
+ // Default to the 'latest' tag when omitted.
+ if (name.tag().empty()) {
+ name.set_tag("latest");
+ }
+
+ // Extract the registry and repository. The first component can
+ // either be the registry, or the first part of the repository!
+ // We resolve this ambiguity using the same hacks used in the
+ // docker code ('.', ':', 'localhost' indicate a registry).
+ std::vector<std::string> split = strings::split(s, "/", 2);
+
+ if (split.size() == 1) {
+ name.set_repository(s);
+ } else if (strings::contains(split[0], ".") ||
+ strings::contains(split[0], ":") ||
+ split[0] == "localhost") {
+ name.set_registry(split[0]);
+ name.set_repository(split[1]);
+ } else {
+ name.set_repository(s);
+ }
+
+ return name;
+}
+
+
+inline std::ostream& operator<<(
+ std::ostream& stream,
+ const Image::Name& name)
+{
+ if (name.has_registry()) {
+ return stream << name.registry() << "/" << name.repository() << ":"
+ << name.tag();
+ }
+
+ return stream << name.repository() << ":" << name.tag();
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MESSAGES_DOCKER_PROVISIONER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/message.proto
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/message.proto b/src/slave/containerizer/mesos/provisioner/docker/message.proto
new file mode 100644
index 0000000..c33e0c5
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/message.proto
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos/mesos.proto";
+
+package mesos.internal.slave.docker;
+
+/**
+ * A Docker Image name and the layer ids of the layers that comprise the image.
+ * The layerIds are ordered, with the root layer id (no parent layer id) first
+ * and the leaf layer id last.
+ */
+message Image {
+ message Name {
+ optional string registry = 1;
+ required string repository = 2;
+
+ // TODO(bmahler): This may hold a tag or a digest, split these?
+ required string tag = 3;
+ }
+
+ required Name name = 1;
+
+ // The order of the layers represents the dependency between layers.
+ repeated string layer_ids = 2;
+}
+
+
+message Images {
+ repeated Image images = 1;
+}
+
+
+/**
+* Protobuf for the Docker image manifest JSON schema:
+* https://github.com/docker/distribution/blob/master/docs/spec/manifest-v2-1.md
+*/
+message DockerImageManifest {
+ required string name = 1;
+ required string tag = 2;
+ required string architecture = 3;
+
+ message FsLayers {
+ required string blobSum = 1;
+ }
+
+ repeated FsLayers fsLayers = 4;
+
+ message History {
+ message V1Compatibility {
+ required string id = 1;
+ required string parent = 2;
+ }
+
+ required V1Compatibility v1Compatibility = 1;
+ }
+
+ repeated History history = 5;
+ required uint32 schemaVersion = 6;
+
+ message Signatures {
+
+ //JOSE (A JSON Web Signature).
+ message Header {
+
+ //JSON Web Key.
+ message Jwk {
+ required string crv = 1;
+ required string kid = 2;
+ required string kty = 3;
+ required string x = 4;
+ required string y = 5;
+ }
+
+ optional Jwk jwk = 1;
+ required string alg = 2;
+ }
+
+ required Header header = 1;
+ required string signature = 2;
+ required string protected = 3;
+ }
+
+ repeated Signatures signatures = 7;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp
new file mode 100644
index 0000000..af6f5b8
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.cpp
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
+
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/foreach.hpp>
+#include <stout/hashset.hpp>
+#include <stout/os.hpp>
+#include <stout/protobuf.hpp>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/owned.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+#include "slave/state.hpp"
+
+using namespace process;
+
+using std::list;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class MetadataManagerProcess : public process::Process<MetadataManagerProcess>
+{
+public:
+ MetadataManagerProcess(const Flags& _flags) : flags(_flags) {}
+
+ ~MetadataManagerProcess() {}
+
+ Future<Nothing> recover();
+
+ Future<Image> put(
+ const Image::Name& name,
+ const std::vector<std::string>& layerIds);
+
+ Future<Option<Image>> get(const Image::Name& name);
+
+ // TODO(chenlily): Implement removal of unreferenced images.
+
+private:
+ // Write out metadata manager state to persistent store.
+ Try<Nothing> persist();
+
+ const Flags flags;
+
+ // This is a lookup table for images that are stored in memory. It is keyed
+ // by the name of the Image.
+ // For example, "ubuntu:14.04" -> ubuntu14:04 Image.
+ hashmap<std::string, Image> storedImages;
+};
+
+
+Try<Owned<MetadataManager>> MetadataManager::create(const Flags& flags)
+{
+ Owned<MetadataManagerProcess> process(new MetadataManagerProcess(flags));
+
+ return Owned<MetadataManager>(new MetadataManager(process));
+}
+
+
+MetadataManager::MetadataManager(Owned<MetadataManagerProcess> process)
+ : process(process)
+{
+ process::spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+MetadataManager::~MetadataManager()
+{
+ process::terminate(process.get());
+ process::wait(process.get());
+}
+
+
+Future<Nothing> MetadataManager::recover()
+{
+ return process::dispatch(process.get(), &MetadataManagerProcess::recover);
+}
+
+
+Future<Image> MetadataManager::put(
+ const Image::Name& name,
+ const vector<string>& layerIds)
+{
+ return dispatch(
+ process.get(),
+ &MetadataManagerProcess::put,
+ name,
+ layerIds);
+}
+
+
+Future<Option<Image>> MetadataManager::get(const Image::Name& name)
+{
+ return dispatch(process.get(), &MetadataManagerProcess::get, name);
+}
+
+
+Future<Image> MetadataManagerProcess::put(
+ const Image::Name& name,
+ const vector<string>& layerIds)
+{
+ const string imageName = stringify(name);
+
+ Image dockerImage;
+ dockerImage.mutable_name()->CopyFrom(name);
+ foreach (const string& layerId, layerIds) {
+ dockerImage.add_layer_ids(layerId);
+ }
+
+ storedImages[imageName] = dockerImage;
+
+ Try<Nothing> status = persist();
+ if (status.isError()) {
+ return Failure("Failed to save state of Docker images: " + status.error());
+ }
+
+ return dockerImage;
+}
+
+
+Future<Option<Image>> MetadataManagerProcess::get(
+ const Image::Name& name)
+{
+ const string imageName = stringify(name);
+
+ if (!storedImages.contains(imageName)) {
+ return None();
+ }
+
+ return storedImages[imageName];
+}
+
+
+Try<Nothing> MetadataManagerProcess::persist()
+{
+ Images images;
+
+ foreachvalue (const Image& image, storedImages) {
+ images.add_images()->CopyFrom(image);
+ }
+
+ Try<Nothing> status = state::checkpoint(
+ paths::getStoredImagesPath(flags.docker_store_dir), images);
+ if (status.isError()) {
+ return Error("Failed to perform checkpoint: " + status.error());
+ }
+
+ return Nothing();
+}
+
+
+Future<Nothing> MetadataManagerProcess::recover()
+{
+ string storedImagesPath = paths::getStoredImagesPath(flags.docker_store_dir);
+
+ if (!os::exists(storedImagesPath)) {
+ LOG(INFO) << "No images to load from disk. Docker provisioner image "
+ << "storage path '" << storedImagesPath << "' does not exist";
+ return Nothing();
+ }
+
+ Result<Images> images = ::protobuf::read<Images>(storedImagesPath);
+ if (images.isError()) {
+ return Failure("Failed to read protobuf for Docker provisioner image: " +
+ images.error());
+ }
+
+ foreach (const Image image, images.get().images()) {
+ vector<string> missingLayerIds;
+ foreach (const string layerId, image.layer_ids()) {
+ const string rootfsPath =
+ paths::getImageLayerRootfsPath(flags.docker_store_dir, layerId);
+
+ if (!os::exists(rootfsPath)) {
+ missingLayerIds.push_back(layerId);
+ }
+ }
+
+ if (!missingLayerIds.empty()) {
+ LOG(WARNING) << "Skipped loading image '" << stringify(image.name())
+ << "' due to missing layers: " << stringify(missingLayerIds);
+ continue;
+ }
+
+ const string imageName = stringify(image.name());
+ if (storedImages.contains(imageName)) {
+ LOG(WARNING) << "Found duplicate image in recovery for image name '"
+ << imageName << "'";
+ } else {
+ storedImages[imageName] = image;
+ }
+ }
+
+ LOG(INFO) << "Loaded " << storedImages.size() << " Docker images";
+
+ return Nothing();
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp
new file mode 100644
index 0000000..dbae8d8
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_METADATA_MANAGER_HPP__
+#define __PROVISIONER_DOCKER_METADATA_MANAGER_HPP__
+
+#include <list>
+#include <string>
+
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/try.hpp>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include "slave/containerizer/mesos/provisioner/provisioner.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Forward Declaration.
+class MetadataManagerProcess;
+
+/**
+ * The MetadataManager tracks the Docker images cached by the
+ * provisioner that are stored on disk. It keeps track of the layers
+ * that Docker images are composed of and recovers Image objects
+ * upon initialization by checking for dependent layers stored on disk.
+ * Currently, image layers are stored indefinitely, with no garbage
+ * collection of unreferenced image layers.
+ */
+class MetadataManager
+{
+public:
+ static Try<process::Owned<MetadataManager>> create(const Flags& flags);
+
+ ~MetadataManager();
+
+ /**
+ * Recover all stored Image and its layer references.
+ */
+ process::Future<Nothing> recover();
+
+ /**
+ * Create an Image, put it in metadata manager and persist the reference
+ * store state to disk.
+ *
+ * @param name the name of the Docker image to place in the reference
+ * store.
+ * @param layerIds the list of layer ids that comprise the Docker image in
+ * order where the root layer's id (no parent layer) is first
+ * and the leaf layer's id is last.
+ */
+ process::Future<Image> put(
+ const Image::Name& name,
+ const std::vector<std::string>& layerIds);
+
+ /**
+ * Retrieve Image based on image name if it is among the Images
+ * stored in memory.
+ *
+ * @param name the name of the Docker image to retrieve
+ */
+ process::Future<Option<Image>> get(const Image::Name& name);
+
+private:
+ explicit MetadataManager(process::Owned<MetadataManagerProcess> process);
+
+ MetadataManager(const MetadataManager&); // Not copyable.
+ MetadataManager& operator=(const MetadataManager&); // Not assignable.
+
+ process::Owned<MetadataManagerProcess> process;
+};
+
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_METADATA_MANAGER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.cpp b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
new file mode 100644
index 0000000..e3392ea
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+
+#include <stout/path.hpp>
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace paths {
+
+string getStagingDir(const string& storeDir)
+{
+ return path::join(storeDir, "staging");
+}
+
+
+string getStagingTempDir(const string& storeDir)
+{
+ return path::join(getStagingDir(storeDir), "XXXXXX");
+}
+
+
+string getImageArchiveTarPath(
+ const string& discoveryDir,
+ const string& name)
+{
+ return path::join(discoveryDir, name + ".tar");
+}
+
+
+string getImageArchiveRepositoriesPath(const string& archivePath)
+{
+ return path::join(archivePath, "repositories");
+}
+
+
+std::string getImageArchiveLayerPath(
+ const string& archivePath,
+ const string& layerId)
+{
+ return path::join(archivePath, layerId);
+}
+
+
+string getImageArchiveLayerManifestPath(
+ const string& archivePath,
+ const string& layerId)
+{
+ return path::join(getImageArchiveLayerPath(archivePath, layerId), "json");
+}
+
+
+string getImageArchiveLayerTarPath(
+ const string& archivePath,
+ const string& layerId)
+{
+ return path::join(
+ getImageArchiveLayerPath(archivePath, layerId), "layer.tar");
+}
+
+
+string getImageArchiveLayerRootfsPath(
+ const string& archivePath,
+ const string& layerId)
+{
+ return path::join(getImageArchiveLayerPath(archivePath, layerId), "rootfs");
+}
+
+
+string getImageLayerPath(
+ const string& storeDir,
+ const string& layerId)
+{
+ return path::join(storeDir, "layers", layerId);
+}
+
+
+string getImageLayerRootfsPath(
+ const string& storeDir,
+ const string& layerId)
+{
+ return path::join(getImageLayerPath(storeDir, layerId), "rootfs");
+}
+
+
+string getStoredImagesPath(const string& storeDir)
+{
+ return path::join(storeDir, "storedImages");
+}
+
+} // namespace paths {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.hpp b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
new file mode 100644
index 0000000..18beb2e
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_PATHS_HPP__
+#define __PROVISIONER_DOCKER_PATHS_HPP__
+
+#include <list>
+#include <string>
+
+#include <mesos/mesos.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace paths {
+
+/**
+ * The Docker store file system layout is as follows:
+ * Image store dir ('--docker_store_dir' slave flag)
+ * |--staging
+ * |-- <temp_dir_archive>
+ * |-- <layer_id>
+ * |-- rootfs
+ * |--layers
+ * |--<layer_id>
+ * |--rootfs
+ * |--storedImages (file holding on cached images)
+ */
+
+std::string getStagingDir(const std::string& storeDir);
+
+
+std::string getStagingTempDir(const std::string& storeDir);
+
+
+std::string getImageArchiveTarPath(
+ const std::string& discoveryDir,
+ const std::string& name);
+
+
+std::string getImageArchiveRepositoriesPath(const std::string& archivePath);
+
+
+std::string getImageArchiveLayerPath(
+ const std::string& archivePath,
+ const std::string& layerId);
+
+
+std::string getImageArchiveLayerManifestPath(
+ const std::string& archivePath,
+ const std::string& layerId);
+
+
+std::string getImageArchiveLayerTarPath(
+ const std::string& archivePath,
+ const std::string& layerId);
+
+
+std::string getImageArchiveLayerRootfsPath(
+ const std::string& archivePath,
+ const std::string& layerId);
+
+
+std::string getImageLayerPath(
+ const std::string& storeDir,
+ const std::string& layerId);
+
+
+std::string getImageLayerRootfsPath(
+ const std::string& storeDir,
+ const std::string& layerId);
+
+
+std::string getStoredImagesPath(const std::string& storeDir);
+
+} // namespace paths {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_PATHS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
new file mode 100644
index 0000000..f61f9e5
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
+
+using std::string;
+
+using process::Owned;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+Try<Owned<Puller>> Puller::create(const Flags& flags)
+{
+ const string puller = flags.docker_puller;
+
+ if (puller == "local") {
+ return Owned<Puller>(new LocalPuller(flags));
+ }
+
+ return Error("Unknown or unsupported docker puller: " + puller);
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
new file mode 100644
index 0000000..8010b8a
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_PULLER_HPP__
+#define __PROVISIONER_DOCKER_PULLER_HPP__
+
+#include <list>
+#include <utility>
+
+#include <stout/try.hpp>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class Puller
+{
+public:
+ static Try<process::Owned<Puller>> create(const Flags& flags);
+
+ virtual ~Puller() {}
+
+ /**
+ * Pull a Docker image layers into the specified directory, and
+ * return the list of layer ids in that image in the right
+ * dependency order, and also return the directory where
+ * the puller puts its changeset.
+ *
+ * @param name The name of the image.
+ * @param directory The target directory to store the layers.
+ * @return list of layers maped to its local directory ordered by its
+ * dependency.
+ */
+ virtual process::Future<std::list<std::pair<std::string, std::string>>> pull(
+ const docker::Image::Name& name,
+ const std::string& directory) = 0;
+};
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+
+#endif // __PROVISIONER_DOCKER_PULLER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
new file mode 100644
index 0000000..5a01f1b
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.cpp
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/http.hpp>
+#include <process/io.hpp>
+
+#include <stout/os.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/token_manager.hpp"
+
+using std::string;
+using std::vector;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+
+using process::http::Request;
+using process::http::Response;
+using process::http::URL;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+using FileSystemLayerInfo = RegistryClient::FileSystemLayerInfo;
+
+using ManifestResponse = RegistryClient::ManifestResponse;
+
+const Duration RegistryClient::DEFAULT_MANIFEST_TIMEOUT_SECS = Seconds(10);
+
+const size_t RegistryClient::DEFAULT_MANIFEST_MAXSIZE_BYTES = 4096;
+
+static const uint16_t DEFAULT_SSL_PORT = 443;
+
+class RegistryClientProcess : public Process<RegistryClientProcess>
+{
+public:
+ static Try<Owned<RegistryClientProcess>> create(
+ const URL& registry,
+ const URL& authServer,
+ const Option<RegistryClient::Credentials>& creds);
+
+ Future<RegistryClient::ManifestResponse> getManifest(
+ const string& path,
+ const Option<string>& tag,
+ const Duration& timeout);
+
+ Future<size_t> getBlob(
+ const string& path,
+ const Option<string>& digest,
+ const Path& filePath,
+ const Duration& timeout,
+ size_t maxSize);
+
+private:
+ RegistryClientProcess(
+ const URL& registryServer,
+ const Owned<TokenManager>& tokenManager,
+ const Option<RegistryClient::Credentials>& creds);
+
+ Future<Response> doHttpGet(
+ const URL& url,
+ const Option<process::http::Headers>& headers,
+ const Duration& timeout,
+ bool resend,
+ const Option<string>& lastResponse) const;
+
+ Try<process::http::Headers> getAuthenticationAttributes(
+ const Response& httpResponse) const;
+
+ const URL registryServer_;
+ Owned<TokenManager> tokenManager_;
+ const Option<RegistryClient::Credentials> credentials_;
+
+ RegistryClientProcess(const RegistryClientProcess&) = delete;
+ RegistryClientProcess& operator = (const RegistryClientProcess&) = delete;
+};
+
+
+Try<Owned<RegistryClient>> RegistryClient::create(
+ const URL& registryServer,
+ const URL& authServer,
+ const Option<Credentials>& creds)
+{
+ Try<Owned<RegistryClientProcess>> process =
+ RegistryClientProcess::create(authServer, registryServer, creds);
+
+ if (process.isError()) {
+ return Error(process.error());
+ }
+
+ return Owned<RegistryClient>(
+ new RegistryClient(authServer, registryServer, creds, process.get()));
+}
+
+
+RegistryClient::RegistryClient(
+ const URL& registryServer,
+ const URL& authServer,
+ const Option<Credentials>& creds,
+ const Owned<RegistryClientProcess>& process)
+ : registryServer_(registryServer),
+ authServer_(authServer),
+ credentials_(creds),
+ process_(process)
+{
+ spawn(CHECK_NOTNULL(process_.get()));
+}
+
+
+RegistryClient::~RegistryClient()
+{
+ terminate(process_.get());
+ process::wait(process_.get());
+}
+
+
+Future<ManifestResponse> RegistryClient::getManifest(
+ const string& _path,
+ const Option<string>& _tag,
+ const Option<Duration>& _timeout)
+{
+ Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+
+ return dispatch(
+ process_.get(),
+ &RegistryClientProcess::getManifest,
+ _path,
+ _tag,
+ timeout);
+}
+
+
+Future<size_t> RegistryClient::getBlob(
+ const string& _path,
+ const Option<string>& _digest,
+ const Path& _filePath,
+ const Option<Duration>& _timeout,
+ const Option<size_t>& _maxSize)
+{
+ Duration timeout = _timeout.getOrElse(DEFAULT_MANIFEST_TIMEOUT_SECS);
+ size_t maxSize = _maxSize.getOrElse(DEFAULT_MANIFEST_MAXSIZE_BYTES);
+
+ return dispatch(
+ process_.get(),
+ &RegistryClientProcess::getBlob,
+ _path,
+ _digest,
+ _filePath,
+ timeout,
+ maxSize);
+}
+
+
+Try<Owned<RegistryClientProcess>> RegistryClientProcess::create(
+ const URL& registryServer,
+ const URL& authServer,
+ const Option<RegistryClient::Credentials>& creds)
+{
+ Try<Owned<TokenManager>> tokenMgr = TokenManager::create(authServer);
+ if (tokenMgr.isError()) {
+ return Error("Failed to create token manager: " + tokenMgr.error());
+ }
+
+ return Owned<RegistryClientProcess>(
+ new RegistryClientProcess(registryServer, tokenMgr.get(), creds));
+}
+
+
+RegistryClientProcess::RegistryClientProcess(
+ const URL& registryServer,
+ const Owned<TokenManager>& tokenMgr,
+ const Option<RegistryClient::Credentials>& creds)
+ : registryServer_(registryServer),
+ tokenManager_(tokenMgr),
+ credentials_(creds) {}
+
+
+Try<process::http::Headers> RegistryClientProcess::getAuthenticationAttributes(
+ const Response& httpResponse) const
+{
+ if (httpResponse.headers.find("WWW-Authenticate") ==
+ httpResponse.headers.end()) {
+ return Error("Failed to find WWW-Authenticate header value");
+ }
+
+ const string& authString = httpResponse.headers.at("WWW-Authenticate");
+
+ const vector<string> authStringTokens = strings::tokenize(authString, " ");
+ if ((authStringTokens.size() != 2) || (authStringTokens[0] != "Bearer")) {
+ // TODO(jojy): Look at various possibilities of auth response. We currently
+ // assume that the string will have realm information.
+ return Error("Invalid authentication header value: " + authString);
+ }
+
+ const vector<string> authParams = strings::tokenize(authStringTokens[1], ",");
+
+ process::http::Headers authAttributes;
+ auto addAttribute = [&authAttributes](
+ const string& param) -> Try<Nothing> {
+ const vector<string> paramTokens =
+ strings::tokenize(param, "=\"");
+
+ if (paramTokens.size() != 2) {
+ return Error(
+ "Failed to get authentication attribute from response parameter " +
+ param);
+ }
+
+ authAttributes.insert({paramTokens[0], paramTokens[1]});
+
+ return Nothing();
+ };
+
+ foreach (const string& param, authParams) {
+ Try<Nothing> addRes = addAttribute(param);
+ if (addRes.isError()) {
+ return Error(addRes.error());
+ }
+ }
+
+ return authAttributes;
+}
+
+
+Future<Response> RegistryClientProcess::doHttpGet(
+ const URL& url,
+ const Option<process::http::Headers>& headers,
+ const Duration& timeout,
+ bool resend,
+ const Option<string>& lastResponseStatus) const
+{
+ return process::http::get(url, headers)
+ .after(timeout, [](
+ const Future<Response>& httpResponseFuture) -> Future<Response> {
+ return Failure("Response timeout");
+ })
+ .then(defer(self(), [=](
+ const Response& httpResponse) -> Future<Response> {
+ VLOG(1) << "Response status: " + httpResponse.status;
+
+ // Set the future if we get a OK response.
+ if (httpResponse.status == "200 OK") {
+ return httpResponse;
+ } else if (httpResponse.status == "400 Bad Request") {
+ Try<JSON::Object> errorResponse =
+ JSON::parse<JSON::Object>(httpResponse.body);
+
+ if (errorResponse.isError()) {
+ return Failure("Failed to parse bad request response JSON: " +
+ errorResponse.error());
+ }
+
+ std::ostringstream out;
+ bool first = true;
+ Result<JSON::Array> errorObjects =
+ errorResponse.get().find<JSON::Array>("errors");
+
+ if (errorObjects.isError()) {
+ return Failure("Failed to find 'errors' in bad request response: " +
+ errorObjects.error());
+ } else if (errorObjects.isNone()) {
+ return Failure("Errors not found in bad request response");
+ }
+
+ foreach (const JSON::Value& error, errorObjects.get().values) {
+ Result<JSON::String> message =
+ error.as<JSON::Object>().find<JSON::String>("message");
+
+ if (message.isError()) {
+ return Failure("Failed to parse bad request error message: " +
+ message.error());
+ } else if (message.isNone()) {
+ continue;
+ }
+
+ if (first) {
+ out << message.get().value;
+ first = false;
+ } else {
+ out << ", " << message.get().value;
+ }
+ }
+
+ return Failure("Received Bad request, errors: [" + out.str() + "]");
+ }
+
+ // Prevent infinite recursion.
+ if (lastResponseStatus.isSome() &&
+ (lastResponseStatus.get() == httpResponse.status)) {
+ return Failure("Invalid response: " + httpResponse.status);
+ }
+
+ // If resend is not set, we dont try again and stop here.
+ if (!resend) {
+ return Failure("Bad response: " + httpResponse.status);
+ }
+
+ // Handle 401 Unauthorized.
+ if (httpResponse.status == "401 Unauthorized") {
+ Try<process::http::Headers> authAttributes =
+ getAuthenticationAttributes(httpResponse);
+
+ if (authAttributes.isError()) {
+ return Failure(
+ "Failed to get authentication attributes: " +
+ authAttributes.error());
+ }
+
+ // TODO(jojy): Currently only handling TLS/cert authentication.
+ Future<Token> tokenResponse = tokenManager_->getToken(
+ authAttributes.get().at("service"),
+ authAttributes.get().at("scope"),
+ None());
+
+ return tokenResponse
+ .after(timeout, [=](
+ Future<Token> tokenResponse) -> Future<Token> {
+ tokenResponse.discard();
+ return Failure("Token response timeout");
+ })
+ .then(defer(self(), [=](
+ const Future<Token>& tokenResponse) {
+ // Send request with acquired token.
+ process::http::Headers authHeaders = {
+ {"Authorization", "Bearer " + tokenResponse.get().raw}
+ };
+
+ return doHttpGet(
+ url,
+ authHeaders,
+ timeout,
+ true,
+ httpResponse.status);
+ }));
+ } else if (httpResponse.status == "307 Temporary Redirect") {
+ // Handle redirect.
+
+ // TODO(jojy): Add redirect functionality in http::get.
+
+ auto toURL = [](
+ const string& urlString) -> Try<URL> {
+ // TODO(jojy): Need to add functionality to URL class that parses a
+ // string to its URL components. For now, assuming:
+ // - scheme is https
+ // - path always ends with /
+
+ static const string schemePrefix = "https://";
+
+ if (!strings::contains(urlString, schemePrefix)) {
+ return Error(
+ "Failed to find expected token '" + schemePrefix +
+ "' in redirect url");
+ }
+
+ const string schemeSuffix = urlString.substr(schemePrefix.length());
+
+ const vector<string> components =
+ strings::tokenize(schemeSuffix, "/");
+
+ const string path = schemeSuffix.substr(components[0].length());
+
+ const vector<string> addrComponents =
+ strings::tokenize(components[0], ":");
+
+ uint16_t port = DEFAULT_SSL_PORT;
+ string domain = components[0];
+
+ // Parse the port.
+ if (addrComponents.size() == 2) {
+ domain = addrComponents[0];
+
+ Try<uint16_t> tryPort = numify<uint16_t>(addrComponents[1]);
+ if (tryPort.isError()) {
+ return Error(
+ "Failed to parse location: " + urlString + " for port.");
+ }
+
+ port = tryPort.get();
+ }
+
+ return URL("https", domain, port, path);
+ };
+
+ if (httpResponse.headers.find("Location") ==
+ httpResponse.headers.end()) {
+ return Failure(
+ "Invalid redirect response: 'Location' not found in headers.");
+ }
+
+ const string& location = httpResponse.headers.at("Location");
+ Try<URL> tryUrl = toURL(location);
+ if (tryUrl.isError()) {
+ return Failure(
+ "Failed to parse '" + location + "': " + tryUrl.error());
+ }
+
+ return doHttpGet(
+ tryUrl.get(),
+ headers,
+ timeout,
+ false,
+ httpResponse.status);
+ } else {
+ return Failure("Invalid response: " + httpResponse.status);
+ }
+ }));
+}
+
+
+Future<ManifestResponse> RegistryClientProcess::getManifest(
+ const string& path,
+ const Option<string>& tag,
+ const Duration& timeout)
+{
+ if (strings::contains(path, " ")) {
+ return Failure("Invalid repository path: " + path);
+ }
+
+ string repoTag = tag.getOrElse("latest");
+ if (strings::contains(repoTag, " ")) {
+ return Failure("Invalid repository tag: " + repoTag);
+ }
+
+ URL manifestURL(registryServer_);
+ manifestURL.path =
+ "v2/" + path + "/manifests/" + repoTag;
+
+ auto getManifestResponse = [](
+ const Response& httpResponse) -> Try<ManifestResponse> {
+ if (!httpResponse.headers.contains("Docker-Content-Digest")) {
+ return Error("Docker-Content-Digest header missing in response");
+ }
+
+ Try<JSON::Object> responseJSON =
+ JSON::parse<JSON::Object>(httpResponse.body);
+
+ if (responseJSON.isError()) {
+ return Error(responseJSON.error());
+ }
+
+ Result<JSON::String> name = responseJSON.get().find<JSON::String>("name");
+ if (name.isNone()) {
+ return Error("Failed to find \"name\" in manifest response");
+ }
+
+ Result<JSON::Array> fsLayers =
+ responseJSON.get().find<JSON::Array>("fsLayers");
+
+ if (fsLayers.isNone()) {
+ return Error("Failed to find \"fsLayers\" in manifest response");
+ }
+
+ Result<JSON::Array> historyArray =
+ responseJSON.get().find<JSON::Array>("history");
+
+ if (historyArray.isNone()) {
+ return Error("Failed to find \"history\" in manifest response");
+ }
+
+ if (historyArray.get().values.size() != fsLayers.get().values.size()) {
+ return Error(
+ "\"history\" and \"fsLayers\" array count mismatch"
+ "in manifest response");
+ }
+
+ vector<FileSystemLayerInfo> fsLayerInfoList;
+ size_t index = 0;
+
+ foreach (const JSON::Value& layer, fsLayers.get().values) {
+ if (!layer.is<JSON::Object>()) {
+ return Error(
+ "Failed to parse layer as a JSON object for index: " +
+ stringify(index));
+ }
+
+ const JSON::Object& layerInfoJSON = layer.as<JSON::Object>();
+
+ // Get blobsum for layer.
+ const Result<JSON::String> blobSumInfo =
+ layerInfoJSON.find<JSON::String>("blobSum");
+
+ if (blobSumInfo.isNone()) {
+ return Error("Failed to find \"blobSum\" in manifest response");
+ }
+
+ // Get history for layer.
+ if (!historyArray.get().values[index].is<JSON::Object>()) {
+ return Error(
+ "Failed to parse history as a JSON object for index: " +
+ stringify(index));
+ }
+ const JSON::Object& historyObj =
+ historyArray.get().values[index].as<JSON::Object>();
+
+ // Get layer id.
+ const Result<JSON::String> v1CompatibilityJSON =
+ historyObj.find<JSON::String>("v1Compatibility");
+
+ if (!v1CompatibilityJSON.isSome()) {
+ return Error(
+ "Failed to obtain layer v1 compability json in manifest for layer: "
+ + stringify(index));
+ }
+
+ Try<JSON::Object> v1CompatibilityObj =
+ JSON::parse<JSON::Object>(v1CompatibilityJSON.get().value);
+
+ if (!v1CompatibilityObj.isSome()) {
+ return Error(
+ "Failed to parse v1 compability json in manifest for layer: "
+ + stringify(index));
+ }
+
+ const Result<JSON::String> id =
+ v1CompatibilityObj.get().find<JSON::String>("id");
+
+ if (!id.isSome()) {
+ return Error(
+ "Failed to find \"id\" in manifest for layer: " + stringify(index));
+ }
+
+ fsLayerInfoList.emplace_back(
+ FileSystemLayerInfo{
+ blobSumInfo.get().value,
+ id.get().value,
+ });
+
+ index++;
+ }
+
+ return ManifestResponse {
+ name.get().value,
+ httpResponse.headers.at("Docker-Content-Digest"),
+ fsLayerInfoList,
+ };
+ };
+
+ return doHttpGet(manifestURL, None(), timeout, true, None())
+ .then([getManifestResponse] (
+ const Response& response) -> Future<ManifestResponse> {
+ Try<ManifestResponse> manifestResponse = getManifestResponse(response);
+
+ if (manifestResponse.isError()) {
+ return Failure(
+ "Failed to parse manifest response: " + manifestResponse.error());
+ }
+
+ return manifestResponse.get();
+ });
+}
+
+
+Future<size_t> RegistryClientProcess::getBlob(
+ const string& path,
+ const Option<string>& digest,
+ const Path& filePath,
+ const Duration& timeout,
+ size_t maxSize)
+{
+ auto prepare = ([&filePath]() -> Try<Nothing> {
+ const string dirName = filePath.dirname();
+
+ // TODO(jojy): Return more state, for example - if the directory is new.
+ Try<Nothing> dirResult = os::mkdir(dirName, true);
+ if (dirResult.isError()) {
+ return Error(
+ "Failed to create directory to download blob: " +
+ dirResult.error());
+ }
+
+ return dirResult;
+ })();
+
+ // TODO(jojy): This currently leaves a residue in failure cases. Would be
+ // ideal if we can completely rollback.
+ if (prepare.isError()) {
+ return Failure(prepare.error());
+ }
+
+ if (strings::contains(path, " ")) {
+ return Failure("Invalid repository path: " + path);
+ }
+
+ URL blobURL(registryServer_);
+ blobURL.path =
+ "v2/" + path + "/blobs/" + digest.getOrElse("");
+
+ auto saveBlob = [filePath](
+ const Response& httpResponse) -> Future<size_t> {
+ // TODO(jojy): Add verification step.
+ // TODO(jojy): Add check for max size.
+ size_t size = httpResponse.body.length();
+ Try<int> fd = os::open(
+ filePath.value,
+ O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+
+ if (fd.isError()) {
+ return Failure("Failed to open file '" + filePath.value + "': " +
+ fd.error());
+ }
+
+ return process::io::write(fd.get(), httpResponse.body)
+ .then([size](const Future<Nothing>&) { return size; })
+ .onAny([fd]() { os::close(fd.get()); } );
+ };
+
+ return doHttpGet(blobURL, None(), timeout, true, None())
+ .then([saveBlob](const Response& response) { return saveBlob(response); });
+}
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp b/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp
new file mode 100644
index 0000000..1d3377e
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_client.hpp
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__
+#define __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__
+
+#include <string>
+#include <vector>
+
+#include <stout/duration.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/json.hpp>
+#include <stout/path.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace registry {
+
+// Forward declarations.
+class RegistryClientProcess;
+
+
+class RegistryClient
+{
+public:
+ /**
+ * Encapsulates information about a file system layer.
+ */
+ struct FileSystemLayerInfo {
+ // TODO(jojy): This string includes the checksum type also now. Need to
+ // separate this into checksum method and checksum.
+ const std::string checksumInfo;
+ const std::string layerId;
+ };
+
+ /**
+ * Encapsulates response of "GET Manifest" request.
+ *
+ * Reference: https://docs.docker.com/registry/spec/api
+ */
+ struct ManifestResponse {
+ const std::string name;
+ const std::string digest;
+ const std::vector<FileSystemLayerInfo> fsLayerInfoList;
+ };
+
+ /**
+ * Encapsulates auth credentials for the client sessions.
+ * TODO(jojy): Secure heap to protect the credentials.
+ */
+ struct Credentials {
+ /**
+ * UserId for basic authentication.
+ */
+ const Option<std::string> userId;
+ /**
+ * Password for basic authentication.
+ */
+ const Option<std::string> password;
+ /**
+ * Account for fetching data from registry.
+ */
+ const Option<std::string> account;
+ };
+
+ /**
+ * Factory method for creating RegistryClient objects.
+ *
+ * @param registryServer URL of docker registry server.
+ * @param authServer URL of authorization server.
+ * @param credentials credentials for client session (optional).
+ * @return RegistryClient on Success.
+ * Error on failure.
+ */
+ static Try<process::Owned<RegistryClient>> create(
+ const process::http::URL& registryServer,
+ const process::http::URL& authServer,
+ const Option<Credentials>& credentials);
+
+ /**
+ * Fetches manifest for a repository from the client's remote registry server.
+ *
+ * @param path path of the repository on the registry.
+ * @param tag unique tag that identifies the repository. Will default to
+ * latest.
+ * @param timeout Maximum time ater which the request will timeout and return
+ * a failure. Will default to RESPONSE_TIMEOUT.
+ * @return JSON object on success.
+ * Failure on process failure.
+ */
+ process::Future<ManifestResponse> getManifest(
+ const std::string& path,
+ const Option<std::string>& tag,
+ const Option<Duration>& timeout);
+
+ /**
+ * Fetches blob for a repository from the client's remote registry server.
+ *
+ * @param path path of the repository on the registry.
+ * @param digest digest of the blob (from manifest).
+ * @param filePath file path to store the fetched blob.
+ * @param timeout Maximum time ater which the request will timeout and return
+ * a failure. Will default to RESPONSE_TIMEOUT.
+ * @param maxSize Maximum size of the response thats acceptable. Will default
+ * to MAX_RESPONSE_SIZE.
+ * @return size of downloaded blob on success.
+ * Failure in case of any errors.
+ */
+ process::Future<size_t> getBlob(
+ const std::string& path,
+ const Option<std::string>& digest,
+ const Path& filePath,
+ const Option<Duration>& timeout,
+ const Option<size_t>& maxSize);
+
+ ~RegistryClient();
+
+private:
+ RegistryClient(
+ const process::http::URL& registryServer,
+ const process::http::URL& authServer,
+ const Option<Credentials>& credentials,
+ const process::Owned<RegistryClientProcess>& process);
+
+ static const Duration DEFAULT_MANIFEST_TIMEOUT_SECS;
+ static const size_t DEFAULT_MANIFEST_MAXSIZE_BYTES;
+
+ const process::http::URL registryServer_;
+ const process::http::URL authServer_;
+ const Option<Credentials> credentials_;
+ process::Owned<RegistryClientProcess> process_;
+
+ RegistryClient(const RegistryClient&) = delete;
+ RegistryClient& operator=(const RegistryClient&) = delete;
+};
+
+} // namespace registry {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_REGISTRY_CLIENT_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/spec.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/spec.cpp b/src/slave/containerizer/mesos/provisioner/docker/spec.cpp
new file mode 100644
index 0000000..2703b5d
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/spec.cpp
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stout/foreach.hpp>
+#include <stout/json.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/strings.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/spec.hpp"
+
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace spec {
+
+// Validate if the specified image manifest conforms to the Docker spec.
+Option<Error> validateManifest(const DockerImageManifest& manifest)
+{
+ // Validate required fields are present,
+ // e.g., repeated fields that has to be >= 1.
+ if (manifest.fslayers_size() <= 0) {
+ return Error("FsLayers field must have at least one blobSum");
+ }
+
+ if (manifest.history_size() <= 0) {
+ return Error("History field must have at least one v1Compatibility");
+ }
+
+ if (manifest.signatures_size() <= 0) {
+ return Error("Signatures field must have at least one signature");
+ }
+
+ // Verify that blobSum and v1Compatibility numbers are equal.
+ if (manifest.fslayers_size() != manifest.history_size()) {
+ return Error("Size of blobSum and v1Compatibility must be equal");
+ }
+
+ // FsLayers field validation.
+ foreach (const docker::DockerImageManifest::FsLayers& fslayer,
+ manifest.fslayers()) {
+ const string& blobSum = fslayer.blobsum();
+ if (!strings::contains(blobSum, ":")) {
+ return Error("Incorrect blobSum format");
+ }
+ }
+
+ return None();
+}
+
+
+Try<docker::DockerImageManifest> parse(const JSON::Object& json)
+{
+ Try<docker::DockerImageManifest> manifest =
+ protobuf::parse<docker::DockerImageManifest>(json);
+
+ if (manifest.isError()) {
+ return Error("Protobuf parse failed: " + manifest.error());
+ }
+
+ Option<Error> error = validateManifest(manifest.get());
+ if (error.isSome()) {
+ return Error("Docker Image Manifest Validation failed: " +
+ error.get().message);
+ }
+
+ return manifest.get();
+}
+
+} // namespace spec {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/spec.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/spec.hpp b/src/slave/containerizer/mesos/provisioner/docker/spec.hpp
new file mode 100644
index 0000000..96e8d6d
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/spec.hpp
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_SPEC_HPP__
+#define __PROVISIONER_DOCKER_SPEC_HPP__
+
+#include <stout/error.hpp>
+#include <stout/json.hpp>
+#include <stout/option.hpp>
+
+#include <mesos/mesos.hpp>
+
+#include "slave/containerizer/mesos/provisioner/docker/message.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+namespace spec {
+
+// Validate if the specified image manifest conforms to the Docker spec.
+Option<Error> validateManifest(const docker::DockerImageManifest& manifest);
+
+// TODO(Gilbert): add validations here, e.g., Manifest, Blob, Layout, ImageID.
+
+// Parse the DockerImageManifest from the specified JSON object.
+Try<docker::DockerImageManifest> parse(const JSON::Object& json);
+
+} // namespace spec {
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_SPEC_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/store.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
new file mode 100644
index 0000000..bb02d65
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
+
+#include <list>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <stout/json.hpp>
+#include <stout/os.hpp>
+#include <stout/result.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/subprocess.hpp>
+
+#include "common/status_utils.hpp"
+
+#include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+
+#include "slave/flags.hpp"
+
+using namespace process;
+
+using std::list;
+using std::pair;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+class StoreProcess : public Process<StoreProcess>
+{
+public:
+ StoreProcess(
+ const Flags& _flags,
+ const Owned<MetadataManager>& _metadataManager,
+ const Owned<Puller>& _puller)
+ : flags(_flags), metadataManager(_metadataManager), puller(_puller) {}
+
+ ~StoreProcess() {}
+
+ Future<Nothing> recover();
+
+ Future<vector<string>> get(const mesos::Image& image);
+
+private:
+ Future<Image> _get(
+ const Image::Name& name,
+ const Option<Image>& image);
+
+ Future<vector<string>> __get(const Image& image);
+
+ Future<vector<string>> moveLayers(
+ const std::string& staging,
+ const std::list<pair<string, string>>& layerPaths);
+
+ Future<Image> storeImage(
+ const Image::Name& name,
+ const std::vector<std::string>& layerIds);
+
+ Future<Nothing> moveLayer(const pair<string, string>& layerPath);
+
+ const Flags flags;
+ Owned<MetadataManager> metadataManager;
+ Owned<Puller> puller;
+};
+
+
+Try<Owned<slave::Store>> Store::create(const Flags& flags)
+{
+ Try<Owned<Puller>> puller = Puller::create(flags);
+ if (puller.isError()) {
+ return Error("Failed to create Docker puller: " + puller.error());
+ }
+
+ if (!os::exists(flags.docker_store_dir)) {
+ Try<Nothing> mkdir = os::mkdir(flags.docker_store_dir);
+ if (mkdir.isError()) {
+ return Error("Failed to create Docker store directory: " + mkdir.error());
+ }
+ }
+
+ if (!os::exists(paths::getStagingDir(flags.docker_store_dir))) {
+ Try<Nothing> mkdir =
+ os::mkdir(paths::getStagingDir(flags.docker_store_dir));
+
+ if (mkdir.isError()) {
+ return Error("Failed to create Docker store staging directory: " +
+ mkdir.error());
+ }
+ }
+
+ Try<Owned<MetadataManager>> metadataManager = MetadataManager::create(flags);
+ if (metadataManager.isError()) {
+ return Error(metadataManager.error());
+ }
+
+ Owned<StoreProcess> process(
+ new StoreProcess(flags, metadataManager.get(), puller.get()));
+
+ return Owned<slave::Store>(new Store(process));
+}
+
+
+Store::Store(const Owned<StoreProcess>& _process) : process(_process)
+{
+ process::spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+Store::~Store()
+{
+ process::terminate(process.get());
+ process::wait(process.get());
+}
+
+
+Future<Nothing> Store::recover()
+{
+ return dispatch(process.get(), &StoreProcess::recover);
+}
+
+
+Future<vector<string>> Store::get(const mesos::Image& image)
+{
+ return dispatch(process.get(), &StoreProcess::get, image);
+}
+
+
+Future<vector<string>> StoreProcess::get(const mesos::Image& image)
+{
+ if (image.type() != mesos::Image::DOCKER) {
+ return Failure("Docker provisioner store only supports Docker images");
+ }
+
+ Image::Name imageName = parseImageName(image.docker().name());
+
+ return metadataManager->get(imageName)
+ .then(defer(self(), &Self::_get, imageName, lambda::_1))
+ .then(defer(self(), &Self::__get, lambda::_1));
+}
+
+
+Future<Image> StoreProcess::_get(
+ const Image::Name& name,
+ const Option<Image>& image)
+{
+ if (image.isSome()) {
+ return image.get();
+ }
+
+ Try<string> staging =
+ os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir));
+
+ if (staging.isError()) {
+ return Failure("Failed to create a staging directory");
+ }
+
+ return puller->pull(name, staging.get())
+ .then(defer(self(), &Self::moveLayers, staging.get(), lambda::_1))
+ .then(defer(self(), &Self::storeImage, name, lambda::_1))
+ .onAny([staging]() {
+ Try<Nothing> rmdir = os::rmdir(staging.get());
+ if (rmdir.isError()) {
+ LOG(WARNING) << "Failed to remove staging directory: " << rmdir.error();
+ }
+ });
+}
+
+
+Future<vector<string>> StoreProcess::__get(const Image& image)
+{
+ vector<string> layerDirectories;
+ foreach (const string& layer, image.layer_ids()) {
+ layerDirectories.push_back(
+ paths::getImageLayerRootfsPath(
+ flags.docker_store_dir, layer));
+ }
+
+ return layerDirectories;
+}
+
+
+Future<Nothing> StoreProcess::recover()
+{
+ return metadataManager->recover();
+}
+
+
+Future<vector<string>> StoreProcess::moveLayers(
+ const string& staging,
+ const list<pair<string, string>>& layerPaths)
+{
+ list<Future<Nothing>> futures;
+ foreach (const auto& layerPath, layerPaths) {
+ futures.push_back(moveLayer(layerPath));
+ }
+
+ return collect(futures)
+ .then([layerPaths]() {
+ vector<string> layerIds;
+ foreach (const auto& layerPath, layerPaths) {
+ layerIds.push_back(layerPath.first);
+ }
+
+ return layerIds;
+ });
+}
+
+
+Future<Image> StoreProcess::storeImage(
+ const Image::Name& name,
+ const vector<string>& layerIds)
+{
+ return metadataManager->put(name, layerIds);
+}
+
+
+Future<Nothing> StoreProcess::moveLayer(const pair<string, string>& layerPath)
+{
+ if (!os::exists(layerPath.second)) {
+ return Failure("Unable to find layer '" + layerPath.first + "' in '" +
+ layerPath.second + "'");
+ }
+
+ const string imageLayerPath =
+ paths::getImageLayerPath(flags.docker_store_dir, layerPath.first);
+
+ if (!os::exists(imageLayerPath)) {
+ Try<Nothing> mkdir = os::mkdir(imageLayerPath);
+ if (mkdir.isError()) {
+ return Failure("Failed to create layer path in store for id '" +
+ layerPath.first + "': " + mkdir.error());
+ }
+ }
+
+ Try<Nothing> status = os::rename(
+ layerPath.second,
+ paths::getImageLayerRootfsPath(
+ flags.docker_store_dir, layerPath.first));
+
+ if (status.isError()) {
+ return Failure("Failed to move layer '" + layerPath.first +
+ "' to store directory: " + status.error());
+ }
+
+ return Nothing();
+}
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9a722d74/src/slave/containerizer/mesos/provisioner/docker/store.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.hpp b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
new file mode 100644
index 0000000..95e46b9
--- /dev/null
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROVISIONER_DOCKER_STORE_HPP__
+#define __PROVISIONER_DOCKER_STORE_HPP__
+
+#include <string>
+
+#include <stout/try.hpp>
+
+#include <process/future.hpp>
+
+#include "slave/containerizer/mesos/provisioner/store.hpp"
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+namespace docker {
+
+// Forward Declarations.
+class Puller;
+class StoreProcess;
+
+
+// Store fetches the Docker images and stores them on disk.
+class Store : public slave::Store
+{
+public:
+ static Try<process::Owned<slave::Store>> create(const Flags& flags);
+
+ ~Store();
+
+ process::Future<Nothing> recover();
+
+ process::Future<std::vector<std::string>> get(const mesos::Image& image);
+
+private:
+ explicit Store(const process::Owned<StoreProcess>& _process);
+
+ Store& operator=(const Store&) = delete; // Not assignable.
+ Store(const Store&) = delete; // Not copyable.
+
+ process::Owned<StoreProcess> process;
+};
+
+} // namespace docker {
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __PROVISIONER_DOCKER_STORE_HPP__