You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/02/24 00:11:25 UTC
[1/3] mesos git commit: Refactored and simplified the docker puller
interfaces.
Repository: mesos
Updated Branches:
refs/heads/master 60f211afe -> 14f070fda
Refactored and simplified the docker puller interfaces.
Review: https://reviews.apache.org/r/43801
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/caf6c029
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/caf6c029
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/caf6c029
Branch: refs/heads/master
Commit: caf6c02989cfc9236d0ae03eaad4844d6763d5c0
Parents: 60f211a
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Feb 19 21:41:01 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 23 15:08:07 2016 -0800
----------------------------------------------------------------------
.../mesos/provisioner/docker/local_puller.cpp | 220 ++++++++++++-------
.../mesos/provisioner/docker/local_puller.hpp | 11 +-
.../mesos/provisioner/docker/paths.cpp | 55 ++---
.../mesos/provisioner/docker/paths.hpp | 48 ++--
.../mesos/provisioner/docker/puller.cpp | 133 +----------
.../mesos/provisioner/docker/puller.hpp | 47 +---
.../provisioner/docker/registry_puller.cpp | 213 ++++++++++--------
.../provisioner/docker/registry_puller.hpp | 36 ++-
.../mesos/provisioner/docker/store.cpp | 139 ++++++------
.../mesos/provisioner/docker/store.hpp | 2 +-
.../containerizer/provisioner_docker_tests.cpp | 37 ++--
11 files changed, 404 insertions(+), 537 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
index f3e7c04..dfd32dc 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.cpp
@@ -14,8 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include <list>
-#include <map>
+#include <string>
#include <vector>
#include <glog/logging.h>
@@ -28,22 +27,18 @@
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
-#include <process/subprocess.hpp>
+#include <process/process.hpp>
-#include "common/status_utils.hpp"
#include "common/command_utils.hpp"
#include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
using namespace process;
namespace spec = docker::spec;
using std::list;
-using std::map;
-using std::pair;
using std::string;
using std::vector;
@@ -55,23 +50,32 @@ namespace docker {
class LocalPullerProcess : public Process<LocalPullerProcess>
{
public:
- LocalPullerProcess(const string& _archivesDir) : archivesDir(_archivesDir) {}
+ LocalPullerProcess(const string& _archivesDir)
+ : archivesDir(_archivesDir) {}
~LocalPullerProcess() {}
- Future<list<pair<string, string>>> pull(
+ Future<vector<string>> pull(
const spec::ImageReference& reference,
const string& directory);
private:
- Future<list<pair<string, string>>> putImage(
+ Future<vector<string>> _pull(
const spec::ImageReference& reference,
const string& directory);
- Future<list<pair<string, string>>> putLayers(
+ Result<string> getParentLayerId(
+ const string& directory,
+ const string& layerId);
+
+ Future<Nothing> extractLayers(
const string& directory,
const vector<string>& layerIds);
+ Future<Nothing> extractLayer(
+ const string& directory,
+ const string& layerId);
+
const string archivesDir;
};
@@ -90,7 +94,7 @@ Try<Owned<Puller>> LocalPuller::create(const Flags& flags)
}
-LocalPuller::LocalPuller(Owned<LocalPullerProcess>& _process)
+LocalPuller::LocalPuller(Owned<LocalPullerProcess> _process)
: process(_process)
{
spawn(process.get());
@@ -104,16 +108,19 @@ LocalPuller::~LocalPuller()
}
-Future<list<pair<string, string>>> LocalPuller::pull(
+Future<vector<string>> LocalPuller::pull(
const spec::ImageReference& reference,
- const Path& directory)
+ const string& directory)
{
return dispatch(
- process.get(), &LocalPullerProcess::pull, reference, directory);
+ process.get(),
+ &LocalPullerProcess::pull,
+ reference,
+ directory);
}
-Future<list<pair<string, string>>> LocalPullerProcess::pull(
+Future<vector<string>> LocalPullerProcess::pull(
const spec::ImageReference& reference,
const string& directory)
{
@@ -133,110 +140,157 @@ Future<list<pair<string, string>>> LocalPullerProcess::pull(
<< "' to '" << directory << "'";
return command::untar(Path(tarPath), Path(directory))
- .then(defer(self(), &Self::putImage, reference, directory));
+ .then(defer(self(), &Self::_pull, reference, directory));
}
-static Result<string> getParentId(
- const string& directory,
- const string& layerId)
+Future<vector<string>> LocalPullerProcess::_pull(
+ const spec::ImageReference& reference,
+ const string& directory)
{
- Try<string> manifest =
- os::read(paths::getImageArchiveLayerManifestPath(directory, layerId));
-
- if (manifest.isError()) {
- return Error("Failed to read manifest: " + manifest.error());
+ // We first parse the 'repositories' JSON file to get the top most
+ // layer id for the image.
+ Try<string> _repositories = os::read(path::join(directory, "repositories"));
+ if (_repositories.isError()) {
+ return Failure("Failed to read 'repositories': " + _repositories.error());
}
- Try<JSON::Object> json = JSON::parse<JSON::Object>(manifest.get());
- if (json.isError()) {
- return Error("Failed to parse manifest: " + json.error());
- }
+ Try<JSON::Object> repositories =
+ JSON::parse<JSON::Object>(_repositories.get());
- Result<JSON::String> parentId = json.get().find<JSON::String>("parent");
- if (parentId.isNone() || (parentId.isSome() && parentId.get() == "")) {
- return None();
- } else if (parentId.isError()) {
- return Error("Failed to read parent of layer: " + parentId.error());
+ if (repositories.isError()) {
+ return Failure("Failed to parse 'repositories': " + repositories.error());
}
- return parentId.get().value;
-}
+ Result<JSON::Object> repository =
+ repositories->find<JSON::Object>(reference.repository());
+ if (repository.isError()) {
+ return Failure(
+ "Failed to find repository '" + reference.repository() +
+ "' in 'repositories': " + repository.error());
+ } else if (repository.isNone()) {
+ return Failure(
+ "Repository '" + reference.repository() + "' does not "
+ "exist in 'repositories'");
+ }
-Future<list<pair<string, string>>> LocalPullerProcess::putImage(
- const spec::ImageReference& reference,
- const string& directory)
-{
- Try<string> value =
- os::read(paths::getImageArchiveRepositoriesPath(directory));
+ const string tag = reference.has_tag()
+ ? reference.tag()
+ : "latest";
- if (value.isError()) {
- return Failure("Failed to read repository JSON: " + value.error());
+ // NOTE: We don't use JSON find here since a tag might contain '.'.
+ if (repository->values.count(tag) == 0) {
+ return Failure("Tag '" + tag + "' is not found");
}
- Try<JSON::Object> json = JSON::parse<JSON::Object>(value.get());
- if (json.isError()) {
- return Failure("Failed to parse JSON: " + json.error());
+ JSON::Value _layerId = repository->values.at(tag);
+ if (!_layerId.is<JSON::String>()) {
+ return Failure("Layer id is not a string");
}
- Result<JSON::Object> repositoryValue =
- json.get().find<JSON::Object>(reference.repository());
+ string layerId = _layerId.as<JSON::String>().value;
+
+ // Do a traverse to find all parent image layer ids. Here, we assume
+ // that all the parent layers are part of the archive tar, thus are
+ // already extracted under 'directory'.
+ vector<string> layerIds = { layerId };
+ Result<string> parentLayerId = getParentLayerId(directory, layerId);
+ while (parentLayerId.isSome()) {
+ // NOTE: We put parent layer ids in front because that's what the
+ // provisioner backends assume.
+ layerIds.insert(layerIds.begin(), parentLayerId.get());
+ parentLayerId = getParentLayerId(directory, parentLayerId.get());
+ }
- if (repositoryValue.isError()) {
- return Failure("Failed to find repository: " + repositoryValue.error());
- } else if (repositoryValue.isNone()) {
- return Failure("Repository '" + reference.repository() + "' is not found");
+ if (parentLayerId.isError()) {
+ return Failure(
+ "Failed to find parent layer id for layer '" + layerId +
+ "': " + parentLayerId.error());
}
- const JSON::Object repositoryJson = repositoryValue.get();
+ return extractLayers(directory, layerIds)
+ .then([layerIds]() { return layerIds; });
+}
- const string tag = reference.has_tag()
- ? reference.tag()
- : "latest";
- // We don't use JSON find here because a tag might contain a '.'.
- map<string, JSON::Value>::const_iterator entry =
- repositoryJson.values.find(tag);
+Result<string> LocalPullerProcess::getParentLayerId(
+ const string& directory,
+ const string& layerId)
+{
+ const string layerPath = path::join(directory, layerId);
- if (entry == repositoryJson.values.end()) {
- return Failure("Tag '" + tag + "' is not found");
- } else if (!entry->second.is<JSON::String>()) {
- return Failure("Tag JSON value expected to be JSON::String");
+ Try<string> _manifest = os::read(paths::getImageLayerManifestPath(layerPath));
+ if (_manifest.isError()) {
+ return Error("Failed to read manifest: " + _manifest.error());
}
- const string layerId = entry->second.as<JSON::String>().value;
-
- vector<string> layerIds;
- layerIds.push_back(layerId);
- Result<string> parentId = getParentId(directory, layerId);
-
- while (parentId.isSome()) {
- layerIds.insert(layerIds.begin(), parentId.get());
- parentId = getParentId(directory, parentId.get());
+ Try<JSON::Object> manifest = JSON::parse<JSON::Object>(_manifest.get());
+ if (manifest.isError()) {
+ return Error("Failed to parse manifest: " + manifest.error());
}
- if (parentId.isError()) {
- return Failure("Failed to find parent layer id of layer '" + layerId +
- "': " + parentId.error());
+ Result<JSON::Value> parentLayerId = manifest->find<JSON::Value>("parent");
+ if (parentLayerId.isError()) {
+ return Error("Failed to parse 'parent': " + parentLayerId.error());
+ } else if (parentLayerId.isNone()) {
+ return None();
+ } else if (parentLayerId->is<JSON::Null>()) {
+ return None();
+ } else if (!parentLayerId->is<JSON::String>()) {
+ return Error("Unexpected 'parent' type");
}
- return putLayers(directory, layerIds);
+ const string id = parentLayerId->as<JSON::String>().value;
+ if (id == "") {
+ return None();
+ } else {
+ return id;
+ }
}
-Future<list<pair<string, string>>> LocalPullerProcess::putLayers(
+Future<Nothing> LocalPullerProcess::extractLayers(
const string& directory,
const vector<string>& layerIds)
{
- list<Future<pair<string, string>>> futures;
+ list<Future<Nothing>> futures;
foreach (const string& layerId, layerIds) {
- const string tarredLayer =
- paths::getImageArchiveLayerTarPath(directory, layerId);
- futures.push_back(untarLayer(tarredLayer, directory, layerId));
+ futures.push_back(extractLayer(directory, layerId));
+ }
+
+ return collect(futures)
+ .then([]() { return Nothing(); });
+}
+
+
+Future<Nothing> LocalPullerProcess::extractLayer(
+ const string& directory,
+ const string& layerId)
+{
+ const string layerPath = path::join(directory, layerId);
+ const string tar = paths::getImageLayerTarPath(layerPath);
+ const string rootfs = paths::getImageLayerRootfsPath(layerPath);
+
+ Try<Nothing> mkdir = os::mkdir(rootfs);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create directory '" + rootfs + "'"
+ ": " + mkdir.error());
}
- return collect(futures);
+ return command::untar(Path(tar), Path(rootfs))
+ .then([tar]() -> Future<Nothing> {
+ // Remove the tar after the extraction.
+ Try<Nothing> rm = os::rm(tar);
+ if (rm.isError()) {
+ return Failure(
+ "Failed to remove '" + tar + "' "
+ "after extraction: " + rm.error());
+ }
+
+ return Nothing();
+ });
}
} // namespace docker {
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
index 811c24b..5f5aaa3 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/local_puller.hpp
@@ -17,9 +17,9 @@
#ifndef __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
#define __PROVISIONER_DOCKER_LOCAL_PULLER_HPP__
-#include <mesos/docker/spec.hpp>
+#include <process/owned.hpp>
-#include "slave/containerizer/mesos/provisioner/store.hpp"
+#include <mesos/docker/spec.hpp>
#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
@@ -46,15 +46,14 @@ public:
~LocalPuller();
- process::Future<std::list<std::pair<std::string, std::string>>> pull(
+ process::Future<std::vector<std::string>> pull(
const ::docker::spec::ImageReference& reference,
- const Path& directory);
+ const std::string& directory);
private:
- explicit LocalPuller(process::Owned<LocalPullerProcess>& _process);
+ explicit LocalPuller(process::Owned<LocalPullerProcess> _process);
LocalPuller(const LocalPuller&) = delete; // Not copyable.
-
LocalPuller& operator=(const LocalPuller&) = delete; // Not assignable.
process::Owned<LocalPullerProcess> process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.cpp b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
index 82d92a2..a5cc952 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.cpp
@@ -38,74 +38,51 @@ string getStagingTempDir(const string& storeDir)
}
-string getImageArchiveTarPath(
- const string& discoveryDir,
- const string& name)
+string getImageLayerPath(const string& storeDir, const string& layerId)
{
- return path::join(discoveryDir, name + ".tar");
-}
-
-
-string getImageArchiveRepositoriesPath(const string& archivePath)
-{
- return path::join(archivePath, "repositories");
+ return path::join(storeDir, "layers", layerId);
}
-string getImageArchiveLayerPath(
- const string& archivePath,
- const string& layerId)
+string getImageLayerManifestPath(const string& layerPath)
{
- return path::join(archivePath, layerId);
+ return path::join(layerPath, "json");
}
-string getImageArchiveLayerManifestPath(
- const string& archivePath,
- const string& layerId)
+string getImageLayerManifestPath(const string& storeDir, const string& layerId)
{
- return path::join(getImageArchiveLayerPath(archivePath, layerId), "json");
+ return getImageLayerManifestPath(getImageLayerPath(storeDir, layerId));
}
-string getImageArchiveLayerTarPath(
- const string& archivePath,
- const string& layerId)
+string getImageLayerRootfsPath(const string& layerPath)
{
- return path::join(
- getImageArchiveLayerPath(archivePath, layerId), "layer.tar");
+ return path::join(layerPath, "rootfs");
}
-string getImageArchiveLayerRootfsPath(
- const string& archivePath,
- const string& layerId)
+string getImageLayerRootfsPath(const string& storeDir, const string& layerId)
{
- return path::join(getImageArchiveLayerPath(archivePath, layerId), "rootfs");
+ return getImageLayerRootfsPath(getImageLayerPath(storeDir, layerId));
}
-string getImageLayerPath(
- const string& storeDir,
- const string& layerId)
+string getImageLayerTarPath(const string& layerPath)
{
- return path::join(storeDir, "layers", layerId);
+ return path::join(layerPath, "layer.tar");
}
-string getImageLayerManifestPath(
- const string& storeDir,
- const string& layerId)
+string getImageLayerTarPath(const string& storeDir, const string& layerId)
{
- return path::join(getImageLayerPath(storeDir, layerId), "json");
+ return getImageLayerTarPath(getImageLayerPath(storeDir, layerId));
}
-string getImageLayerRootfsPath(
- const string& storeDir,
- const string& layerId)
+string getImageArchiveTarPath(const string& discoveryDir, const string& name)
{
- return path::join(getImageLayerPath(storeDir, layerId), "rootfs");
+ return path::join(discoveryDir, name + ".tar");
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/paths.hpp b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
index d2b0cf9..949e0c1 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/paths.hpp
@@ -17,11 +17,8 @@
#ifndef __PROVISIONER_DOCKER_PATHS_HPP__
#define __PROVISIONER_DOCKER_PATHS_HPP__
-#include <list>
#include <string>
-#include <mesos/mesos.hpp>
-
namespace mesos {
namespace internal {
namespace slave {
@@ -32,8 +29,7 @@ namespace paths {
* The Docker store file system layout is as follows:
* Image store dir ('--docker_store_dir' slave flag)
* |--staging
- * |-- repositories(containing all images info as JSON)
- * |-- <temp_dir_archive>
+ * |-- <staging_tmp_dir_XXXXXX>
* |-- <layer_id>
* |-- rootfs
* |-- json(manifest)
@@ -53,49 +49,43 @@ std::string getStagingDir(const std::string& storeDir);
std::string getStagingTempDir(const std::string& storeDir);
-std::string getImageArchiveTarPath(
- const std::string& discoveryDir,
- const std::string& name);
-
-
-std::string getImageArchiveRepositoriesPath(const std::string& archivePath);
-
-
-std::string getImageArchiveLayerPath(
- const std::string& archivePath,
+std::string getImageLayerPath(
+ const std::string& storeDir,
const std::string& layerId);
-std::string getImageArchiveLayerManifestPath(
- const std::string& archivePath,
- const std::string& layerId);
+std::string getImageLayerManifestPath(
+ const std::string& layerPath);
-std::string getImageArchiveLayerTarPath(
- const std::string& archivePath,
- const std::string& layerId);
+std::string getImageLayerManifestPath(
+ const std::string& storeDir,
+ const std::string& layerId);
-std::string getImageArchiveLayerRootfsPath(
- const std::string& archivePath,
- const std::string& layerId);
+std::string getImageLayerRootfsPath(
+ const std::string& layerPath);
-std::string getImageLayerPath(
+std::string getImageLayerRootfsPath(
const std::string& storeDir,
const std::string& layerId);
-std::string getImageLayerManifestPath(
- const std::string& storeDir,
- const std::string& layerId);
+std::string getImageLayerTarPath(
+ const std::string& layerPath);
-std::string getImageLayerRootfsPath(
+std::string getImageLayerTarPath(
const std::string& storeDir,
const std::string& layerId);
+std::string getImageArchiveTarPath(
+ const std::string& discoveryDir,
+ const std::string& name);
+
+
std::string getStoredImagesPath(const std::string& storeDir);
} // namespace paths {
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
index a239b97..d012ae4 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
@@ -14,34 +14,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include <string>
-#include <tuple>
-#include <utility>
-#include <vector>
-
-#include <stout/os.hpp>
-
-#include <process/check.hpp>
-#include <process/collect.hpp>
-#include <process/io.hpp>
-#include <process/subprocess.hpp>
-
-#include "common/status_utils.hpp"
+#include <stout/strings.hpp>
+#include <stout/try.hpp>
#include "slave/containerizer/mesos/provisioner/docker/local_puller.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
-using std::pair;
-using std::string;
-using std::tuple;
-using std::vector;
-
-using process::Failure;
-using process::Future;
using process::Owned;
-using process::Subprocess;
namespace mesos {
namespace internal {
@@ -68,115 +48,6 @@ Try<Owned<Puller>> Puller::create(const Flags& flags)
return puller.get();
}
-
-Future<Nothing> untar(const string& file, const string& directory)
-{
- const vector<string> argv = {
- "tar",
- "-C",
- directory,
- "-x",
- "-f",
- file
- };
-
- Try<Subprocess> s = subprocess(
- "tar",
- argv,
- Subprocess::PATH("/dev/null"),
- Subprocess::PATH("/dev/null"),
- Subprocess::PIPE());
-
- if (s.isError()) {
- return Failure("Failed to execute the subprocess: " + s.error());
- }
-
- return await(
- s.get().status(),
- process::io::read(s.get().err().get()))
- .then([](const tuple<
- Future<Option<int>>,
- Future<string>>& t) -> Future<Nothing> {
- Future<Option<int>> status = std::get<0>(t);
- if (!status.isReady()) {
- return Failure(
- "Failed to get the exit status of the subprocess: " +
- (status.isFailed() ? status.failure() : "discarded"));
- }
-
- Future<string> error = std::get<1>(t);
- if (!error.isReady()) {
- return Failure(
- "Failed to read stderr from the subprocess: " +
- (error.isFailed() ? error.failure() : "discarded"));
- }
-
- if (status->isNone()) {
- return Failure("Failed to reap the subprocess");
- }
-
- if (status->get() != 0) {
- return Failure(
- "Unexpected result from the subprocess: " +
- WSTRINGIFY(status->get()) +
- ", stderr='" + error.get() + "'");
- }
-
- return Nothing();
- });
-}
-
-
-Future<pair<string, string>> untarLayer(
- const string& file,
- const string& directory,
- const string& layerId)
-{
- // We untar the layer from source into a directory, then move the layer
- // into store. We do this instead of untarring directly to store to make
- // sure we don't end up with partially untarred layer rootfs.
- const string localRootfsPath =
- paths::getImageArchiveLayerRootfsPath(directory, layerId);
-
- // Image layer has been untarred but is not present in the store directory.
- if (os::exists(localRootfsPath)) {
- LOG(WARNING) << "Image layer '" << layerId << "' rootfs present in staging "
- << "directory but not in store directory '"
- << localRootfsPath << "'. Removing staged rootfs and untarring"
- << "layer again.";
-
- Try<Nothing> rmdir = os::rmdir(localRootfsPath);
- if (rmdir.isError()) {
- return Failure(
- "Failed to remove incomplete staged rootfs for layer "
- "'" + layerId + "': " + rmdir.error());
- }
- }
-
- Try<Nothing> mkdir = os::mkdir(localRootfsPath);
- if (mkdir.isError()) {
- return Failure(
- "Failed to create rootfs path '" + localRootfsPath + "'"
- ": " + mkdir.error());
- }
-
- // The tar file will be removed when the staging directory is removed.
- return untar(file, localRootfsPath)
- .then([directory, layerId]() -> Future<pair<string, string>> {
- const string layerPath =
- paths::getImageArchiveLayerPath(directory, layerId);
-
- if (!os::exists(layerPath)) {
- return Failure(
- "Failed to find the rootfs path after extracting layer"
- " '" + layerId + "'");
- }
-
- return pair<string, string>(layerId, layerPath);
- });
-}
-
-
} // namespace docker {
} // namespace slave {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
index 5b2d72c..51894dd 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
@@ -17,12 +17,9 @@
#ifndef __PROVISIONER_DOCKER_PULLER_HPP__
#define __PROVISIONER_DOCKER_PULLER_HPP__
-#include <list>
-#include <utility>
+#include <string>
+#include <vector>
-#include <stout/duration.hpp>
-#include <stout/option.hpp>
-#include <stout/path.hpp>
#include <stout/try.hpp>
#include <process/future.hpp>
@@ -47,47 +44,17 @@ public:
/**
* Pull a Docker image layers into the specified directory, and
* return the list of layer ids in that image in the right
- * dependency order, and also return the directory where
- * the puller puts its changeset.
+ * dependency order (i.e., base images are at the front).
*
- * @param name The name of the image.
+ * @param reference The docker image reference.
* @param directory The target directory to store the layers.
- * @return list of layers maped to its local directory ordered by its
- * dependency.
+ * @return an ordered list of layer ids.
*/
- virtual process::Future<std::list<std::pair<std::string, std::string>>> pull(
+ virtual process::Future<std::vector<std::string>> pull(
const ::docker::spec::ImageReference& reference,
- const Path& directory) = 0;
+ const std::string& directory) = 0;
};
-
-/**
- * Untars(extracts) the tar file(input param) to the given output directory.
- *
- * @param file tar file to be extracted.
- * @param directory target directory for extracting the tar file.
- */
-process::Future<Nothing> untar(
- const std::string& file,
- const std::string& directory);
-
-
-/**
- * Untars a tarred layer changeset into staging directory with the
- * directory structure:
- * |--staging directory
- * |-- <layer_id>
- * |-- rootfs
- *
- * @param file path to the tar file holding the Docker layer.
- * @param directory staging directory.
- * @return layer Id mapping to the rootfs path of the layer.
- */
-process::Future<std::pair<std::string, std::string>> untarLayer(
- const std::string& file,
- const std::string& directory,
- const std::string& layerId);
-
} // namespace docker {
} // namespace slave {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
index 3fcf147..d09e2ec 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
@@ -14,25 +14,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
-
-#include <list>
-
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
-#include <process/subprocess.hpp>
-#include "common/status_utils.hpp"
+#include <stout/os/mkdir.hpp>
+#include <stout/os/rm.hpp>
+
+#include "common/command_utils.hpp"
#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
#include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
+#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
namespace http = process::http;
namespace spec = docker::spec;
using std::list;
-using std::pair;
using std::string;
using std::vector;
@@ -41,7 +39,6 @@ using process::Future;
using process::Owned;
using process::Process;
using process::Promise;
-using process::Subprocess;
namespace mesos {
namespace internal {
@@ -56,37 +53,37 @@ class RegistryPullerProcess : public Process<RegistryPullerProcess>
public:
static Try<Owned<RegistryPullerProcess>> create(const Flags& flags);
- process::Future<list<pair<string, string>>> pull(
+ process::Future<vector<string>> pull(
const spec::ImageReference& reference,
- const Path& directory);
+ const string& directory);
private:
- explicit RegistryPullerProcess(
+ RegistryPullerProcess(
const Owned<RegistryClient>& registry,
const Duration& timeout);
- Future<pair<string, string>> downloadLayer(
- const spec::ImageReference& reference,
- const Path& directory,
- const string& blobSum,
- const string& id);
-
- Future<list<pair<string, string>>> downloadLayers(
+ Future<vector<string>> downloadLayers(
const spec::v2::ImageManifest& manifest,
const spec::ImageReference& reference,
- const Path& downloadDir);
+ const string& directory);
- process::Future<list<pair<string, string>>> _pull(
+ Future<Nothing> downloadLayer(
const spec::ImageReference& reference,
- const Path& downloadDir);
+ const string& directory,
+ const string& blobSum,
+ const string& layerId);
- Future<list<pair<string, string>>> untarLayers(
- const Future<list<pair<string, string>>>& layerFutures,
- const Path& downloadDir);
+ Future<vector<string>> untarLayers(
+ const string& directory,
+ const vector<string>& layerIds);
+
+ Future<Nothing> untarLayer(
+ const string& directory,
+ const string& layerId);
Owned<RegistryClient> registryClient_;
const Duration pullTimeout_;
- hashmap<string, Owned<Promise<pair<string, string>>>> downloadTracker_;
+ hashmap<string, Owned<Promise<Nothing>>> downloadTracker_;
RegistryPullerProcess(const RegistryPullerProcess&) = delete;
RegistryPullerProcess& operator=(const RegistryPullerProcess&) = delete;
@@ -106,29 +103,29 @@ Try<Owned<Puller>> RegistryPuller::create(const Flags& flags)
}
-RegistryPuller::RegistryPuller(const Owned<RegistryPullerProcess>& process)
- : process_(process)
+RegistryPuller::RegistryPuller(Owned<RegistryPullerProcess> _process)
+ : process(_process)
{
- spawn(CHECK_NOTNULL(process_.get()));
+ spawn(CHECK_NOTNULL(process.get()));
}
RegistryPuller::~RegistryPuller()
{
- terminate(process_.get());
- process::wait(process_.get());
+ terminate(process.get());
+ process::wait(process.get());
}
-Future<list<pair<string, string>>> RegistryPuller::pull(
+Future<vector<string>> RegistryPuller::pull(
const spec::ImageReference& reference,
- const Path& downloadDir)
+ const string& directory)
{
return dispatch(
- process_.get(),
+ process.get(),
&RegistryPullerProcess::pull,
reference,
- downloadDir);
+ directory);
}
@@ -173,9 +170,58 @@ RegistryPullerProcess::RegistryPullerProcess(
pullTimeout_(timeout) {}
-Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
+Future<vector<string>> RegistryPullerProcess::pull(
+ const spec::ImageReference& reference,
+ const string& directory)
+{
+ // TODO(jojy): Have one outgoing manifest request per image.
+ return registryClient_->getManifest(reference)
+ .then(process::defer(self(), [=](const spec::v2::ImageManifest& manifest) {
+ return downloadLayers(manifest, reference, directory);
+ }))
+ .then(process::defer(self(), [=](const vector<string>& layerIds) {
+ return untarLayers(directory, layerIds);
+ }))
+ .after(pullTimeout_, [reference](Future<vector<string>> future) {
+ future.discard();
+ return Failure("Timed out");
+ });
+}
+
+
+Future<vector<string>> RegistryPullerProcess::downloadLayers(
+ const spec::v2::ImageManifest& manifest,
const spec::ImageReference& reference,
- const Path& directory,
+ const string& directory)
+{
+ list<Future<Nothing>> futures;
+ vector<string> layerIds;
+
+ CHECK_EQ(manifest.fslayers_size(), manifest.history_size());
+
+ for (int i = 0; i < manifest.fslayers_size(); i++) {
+ CHECK(manifest.history(i).has_v1());
+
+ layerIds.push_back(manifest.history(i).v1().id());
+
+ futures.push_back(downloadLayer(
+ reference,
+ directory,
+ manifest.fslayers(i).blobsum(),
+ manifest.history(i).v1().id()));
+ }
+
+ // TODO(jojy): Delete downloaded files in the directory on discard and
+ // failure?
+ // TODO(jojy): Iterate through the futures and log the failed future.
+ return collect(futures)
+ .then([layerIds]() { return layerIds; });
+}
+
+
+Future<Nothing> RegistryPullerProcess::downloadLayer(
+ const spec::ImageReference& reference,
+ const string& directory,
const string& blobSum,
const string& layerId)
{
@@ -189,8 +235,7 @@ Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
return downloadTracker_.at(layerId)->future();
}
- Owned<Promise<pair<string, string>>> downloadPromise(
- new Promise<pair<string, string>>());
+ Owned<Promise<Nothing>> downloadPromise(new Promise<Nothing>());
downloadTracker_.insert({layerId, downloadPromise});
@@ -216,7 +261,7 @@ Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
downloadPromise->fail(
"Failed to download layer '" + layerId + "': no content");
} else {
- downloadPromise->set({layerId, downloadFile});
+ downloadPromise->set(Nothing());
}
}));
@@ -224,71 +269,51 @@ Future<pair<string, string>> RegistryPullerProcess::downloadLayer(
}
-Future<list<pair<string, string>>> RegistryPullerProcess::pull(
- const spec::ImageReference& reference,
- const Path& directory)
+Future<vector<string>> RegistryPullerProcess::untarLayers(
+ const string& directory,
+ const vector<string>& layerIds)
{
- // TODO(jojy): Have one outgoing manifest request per image.
- return registryClient_->getManifest(reference)
- .then(process::defer(self(), [this, directory, reference](
- const spec::v2::ImageManifest& manifest) {
- return downloadLayers(manifest, reference, directory);
- }))
- .then(process::defer(self(), [this, directory](
- const Future<list<pair<string, string>>>& layerFutures)
- -> Future<list<pair<string, string>>> {
- return untarLayers(layerFutures, directory);
- }))
- .after(pullTimeout_, [reference](
- Future<list<pair<string, string>>> future) {
- future.discard();
+ list<Future<Nothing>> futures;
+ foreach (const string& layerId, layerIds) {
+ VLOG(1) << "Untarring layer '" << layerId
+ << "' downloaded from registry to directory '"
+ << directory << "'";
- return Failure("Timed out");
- });
-}
-
-
-Future<list<pair<string, string>>> RegistryPullerProcess::downloadLayers(
- const spec::v2::ImageManifest& manifest,
- const spec::ImageReference& reference,
- const Path& directory)
-{
- list<Future<pair<string, string>>> downloadFutures;
-
- CHECK_EQ(manifest.fslayers_size(), manifest.history_size());
-
- for (int i = 0; i < manifest.fslayers_size(); i++) {
- CHECK(manifest.history(i).has_v1());
-
- downloadFutures.push_back(
- downloadLayer(reference,
- directory,
- manifest.fslayers(i).blobsum(),
- manifest.history(i).v1().id()));
+ futures.emplace_back(untarLayer(directory, layerId));
}
- // TODO(jojy): Delete downloaded files in the directory on discard and
- // failure?
- // TODO(jojy): Iterate through the futures and log the failed future.
- return collect(downloadFutures);
+ return collect(futures)
+ .then([layerIds]() { return layerIds; });
}
-Future<list<pair<string, string>>> RegistryPullerProcess::untarLayers(
- const Future<list<pair<string, string>>>& layerFutures,
- const Path& directory)
+Future<Nothing> RegistryPullerProcess::untarLayer(
+ const string& directory,
+ const string& layerId)
{
- list<Future<pair<string, string>>> untarFutures;
-
- pair<string, string> layerInfo;
- foreach (layerInfo, layerFutures.get()) {
- VLOG(1) << "Untarring layer '" << layerInfo.first
- << "' downloaded from registry to directory '" << directory << "'";
- untarFutures.emplace_back(
- untarLayer(layerInfo.second, directory, layerInfo.first));
+ const string layerPath = path::join(directory, layerId);
+ const string tar = path::join(directory, layerId + ".tar");
+ const string rootfs = paths::getImageLayerRootfsPath(layerPath);
+
+ Try<Nothing> mkdir = os::mkdir(rootfs);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create directory '" + rootfs + "'"
+ ": " + mkdir.error());
}
- return collect(untarFutures);
+ return command::untar(Path(tar), Path(rootfs))
+ .then([tar]() -> Future<Nothing> {
+ // Remove the tar after the extraction.
+ Try<Nothing> rm = os::rm(tar);
+ if (rm.isError()) {
+ return Failure(
+ "Failed to remove '" + tar + "' "
+ "after extraction: " + rm.error());
+ }
+
+ return Nothing();
+ });
}
} // namespace docker {
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
index bccbac1..c429df9 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
@@ -17,21 +17,14 @@
#ifndef __PROVISIONER_DOCKER_REGISTRY_PULLER_HPP__
#define __PROVISIONER_DOCKER_REGISTRY_PULLER_HPP__
-#include <list>
-#include <string>
-#include <utility>
+#include <process/owned.hpp>
-#include <stout/duration.hpp>
-#include <stout/path.hpp>
-
-#include <process/future.hpp>
-#include <process/http.hpp>
-#include <process/process.hpp>
-
-#include <mesos/docker/spec.hpp>
+#include <stout/try.hpp>
#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+#include "slave/flags.hpp"
+
namespace mesos {
namespace internal {
namespace slave {
@@ -41,36 +34,33 @@ namespace docker {
class RegistryPullerProcess;
/*
- * Pulls an image from registry.
+ * Pulls an image from docker registry.
*/
class RegistryPuller : public Puller
{
public:
- /**
- * Factory method for creating RegistryPuller.
- */
static Try<process::Owned<Puller>> create(const Flags& flags);
~RegistryPuller();
/**
- * Pulls an image into a download directory. This image could consist
- * multiple layers of blobs.
+ * Pulls an image into a download directory. This image could
+ * consist multiple layers of blobs.
*
* @param reference local name of the image.
- * @param downloadDir path to which the layers should be downloaded.
+ * @param directory path to which the layers will be downloaded.
*/
- process::Future<std::list<std::pair<std::string, std::string>>> pull(
+ process::Future<std::vector<std::string>> pull(
const ::docker::spec::ImageReference& reference,
- const Path& downloadDir);
+ const std::string& directory);
private:
- RegistryPuller(const process::Owned<RegistryPullerProcess>& process);
-
- process::Owned<RegistryPullerProcess> process_;
+ RegistryPuller(process::Owned<RegistryPullerProcess> _process);
RegistryPuller(const RegistryPuller&) = delete;
RegistryPuller& operator=(const RegistryPuller&) = delete;
+
+ process::Owned<RegistryPullerProcess> process;
};
} // namespace docker {
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/store.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
index 2f1d3e0..d802a79 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include <list>
+#include <string>
#include <vector>
#include <glog/logging.h>
@@ -26,12 +26,9 @@
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
-#include <process/subprocess.hpp>
#include <mesos/docker/spec.hpp>
-#include "common/status_utils.hpp"
-
#include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
@@ -42,7 +39,6 @@ using namespace process;
namespace spec = docker::spec;
using std::list;
-using std::pair;
using std::string;
using std::vector;
@@ -76,16 +72,15 @@ private:
Future<ImageInfo> __get(const Image& image);
Future<vector<string>> moveLayers(
- const list<pair<string, string>>& layerPaths);
-
- Future<Image> storeImage(
- const spec::ImageReference& reference,
+ const string& staging,
const vector<string>& layerIds);
Future<Nothing> moveLayer(
- const pair<string, string>& layerPath);
+ const string& staging,
+ const string& layerId);
const Flags flags;
+
Owned<MetadataManager> metadataManager;
Owned<Puller> puller;
hashmap<string, Owned<Promise<Image>>> pulling;
@@ -136,7 +131,7 @@ Try<Owned<slave::Store>> Store::create(
}
-Store::Store(const Owned<StoreProcess>& _process) : process(_process)
+Store::Store(Owned<StoreProcess> _process) : process(_process)
{
spawn(CHECK_NOTNULL(process.get()));
}
@@ -161,6 +156,12 @@ Future<ImageInfo> Store::get(const mesos::Image& image)
}
+Future<Nothing> StoreProcess::recover()
+{
+ return metadataManager->recover();
+}
+
+
Future<ImageInfo> StoreProcess::get(const mesos::Image& image)
{
if (image.type() != mesos::Image::DOCKER) {
@@ -199,19 +200,23 @@ Future<Image> StoreProcess::_get(
os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir));
if (staging.isError()) {
- return Failure("Failed to create a staging directory");
+ return Failure("Failed to create a staging directory: " + staging.error());
}
- const string imageReference = stringify(reference);
+ // If there is already an pulling going on for the given 'name', we
+ // will skip the additional pulling.
+ const string name = stringify(reference);
- if (!pulling.contains(imageReference)) {
+ if (!pulling.contains(name)) {
Owned<Promise<Image>> promise(new Promise<Image>());
- Future<Image> future = puller->pull(reference, Path(staging.get()))
- .then(defer(self(), &Self::moveLayers, lambda::_1))
- .then(defer(self(), &Self::storeImage, reference, lambda::_1))
+ Future<Image> future = puller->pull(reference, staging.get())
+ .then(defer(self(), &Self::moveLayers, staging.get(), lambda::_1))
+ .then(defer(self(), [=](const vector<string>& layerIds) {
+ return metadataManager->put(reference, layerIds);
+ }))
.onAny(defer(self(), [=](const Future<Image>&) {
- pulling.erase(imageReference);
+ pulling.erase(name);
Try<Nothing> rmdir = os::rmdir(staging.get());
if (rmdir.isError()) {
@@ -221,12 +226,12 @@ Future<Image> StoreProcess::_get(
}));
promise->associate(future);
- pulling[imageReference] = promise;
+ pulling[name] = promise;
return promise->future();
}
- return pulling[imageReference]->future();
+ return pulling[name]->future();
}
@@ -234,11 +239,10 @@ Future<ImageInfo> StoreProcess::__get(const Image& image)
{
CHECK_LT(0, image.layer_ids_size());
- vector<string> layerDirectories;
- foreach (const string& layer, image.layer_ids()) {
- layerDirectories.push_back(
- paths::getImageLayerRootfsPath(
- flags.docker_store_dir, layer));
+ vector<string> layerPaths;
+ foreach (const string& layerId, image.layer_ids()) {
+ layerPaths.push_back(
+ paths::getImageLayerRootfsPath(flags.docker_store_dir, layerId));
}
// Read the manifest from the last layer because all runtime config
@@ -259,78 +263,61 @@ Future<ImageInfo> StoreProcess::__get(const Image& image)
return Failure("Failed to parse docker v1 manifest: " + v1.error());
}
- return ImageInfo{layerDirectories, v1.get()};
-}
-
-
-Future<Nothing> StoreProcess::recover()
-{
- return metadataManager->recover();
+ return ImageInfo{layerPaths, v1.get()};
}
Future<vector<string>> StoreProcess::moveLayers(
- const list<pair<string, string>>& layerPaths)
+ const string& staging,
+ const vector<string>& layerIds)
{
list<Future<Nothing>> futures;
- foreach (const auto& layerPath, layerPaths) {
- futures.push_back(moveLayer(layerPath));
+ foreach (const string& layerId, layerIds) {
+ futures.push_back(moveLayer(staging, layerId));
}
return collect(futures)
- .then([layerPaths]() {
- vector<string> layerIds;
- foreach (const auto& layerPath, layerPaths) {
- layerIds.push_back(layerPath.first);
- }
-
- return layerIds;
- });
-}
-
-
-Future<Image> StoreProcess::storeImage(
- const spec::ImageReference& reference,
- const vector<string>& layerIds)
-{
- return metadataManager->put(reference, layerIds);
+ .then([layerIds]() { return layerIds; });
}
Future<Nothing> StoreProcess::moveLayer(
- const pair<string, string>& layerPath)
+ const string& staging,
+ const string& layerId)
{
- if (!os::exists(layerPath.second)) {
- return Failure("Unable to find layer '" + layerPath.first + "' in '" +
- layerPath.second + "'");
+ const string source = path::join(staging, layerId);
+
+ // This is the case where the puller skips the pulling of the layer
+ // because the layer already exists in the store.
+ //
+ // TODO(jieyu): Verify that the layer is actually in the store.
+ if (!os::exists(source)) {
+ return Nothing();
}
- const string imageLayerPath =
- paths::getImageLayerPath(flags.docker_store_dir, layerPath.first);
-
- // If image layer path exists, we should remove it and make an empty
- // directory, because os::rename can only have empty or non-existed
- // directory as destination.
- if (os::exists(imageLayerPath)) {
- Try<Nothing> rmdir = os::rmdir(imageLayerPath);
- if (rmdir.isError()) {
- return Failure("Failed to remove existing layer: " + rmdir.error());
- }
+ const string target = paths::getImageLayerPath(
+ flags.docker_store_dir,
+ layerId);
+
+ // NOTE: Since the layer id is supposed to be unique. If the layer
+ // already exists in the store, we'll skip the moving since they are
+ // expected to be the same.
+ if (os::exists(target)) {
+ return Nothing();
}
- Try<Nothing> mkdir = os::mkdir(imageLayerPath);
+ Try<Nothing> mkdir = os::mkdir(target);
if (mkdir.isError()) {
- return Failure("Failed to create layer path in store for id '" +
- layerPath.first + "': " + mkdir.error());
+ return Failure(
+ "Failed to create directory in store for layer '" +
+ layerId + "': " + mkdir.error());
}
- Try<Nothing> status = os::rename(
- layerPath.second,
- imageLayerPath);
-
- if (status.isError()) {
- return Failure("Failed to move layer '" + layerPath.first +
- "' to store directory: " + status.error());
+ Try<Nothing> rename = os::rename(source, target);
+ if (rename.isError()) {
+ return Failure(
+ "Failed to move layer from '" + source +
+ "' to '" + target + "': " + rename.error());
}
return Nothing();
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/slave/containerizer/mesos/provisioner/docker/store.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.hpp b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
index 880e216..1c2b149 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.hpp
@@ -53,7 +53,7 @@ public:
virtual process::Future<ImageInfo> get(const mesos::Image& image);
private:
- explicit Store(const process::Owned<StoreProcess>& process);
+ explicit Store(process::Owned<StoreProcess> process);
Store& operator=(const Store&) = delete; // Not assignable.
Store(const Store&) = delete; // Not copyable.
http://git-wip-us.apache.org/repos/asf/mesos/blob/caf6c029/src/tests/containerizer/provisioner_docker_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/provisioner_docker_tests.cpp b/src/tests/containerizer/provisioner_docker_tests.cpp
index 4db6793..c6b1e69 100644
--- a/src/tests/containerizer/provisioner_docker_tests.cpp
+++ b/src/tests/containerizer/provisioner_docker_tests.cpp
@@ -766,12 +766,12 @@ TEST_F(RegistryClientTest, SimpleRegistryPuller)
Try<Owned<Puller>> registryPuller = RegistryPuller::create(flags);
ASSERT_SOME(registryPuller);
- const Path registryPullerPath(os::getcwd());
+ const string registryPullerPath = os::getcwd();
Try<spec::ImageReference> reference = spec::parseImageReference("busybox");
ASSERT_SOME(reference);
- Future<list<pair<string, string>>> registryPullerFuture =
+ Future<vector<string>> registryPullerFuture =
registryPuller.get()->pull(reference.get(), registryPullerPath);
const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
@@ -974,13 +974,17 @@ TEST_F(RegistryClientTest, SimpleRegistryPuller)
blobResponseSize));
AWAIT_ASSERT_READY(registryPullerFuture);
- list<pair<string, string>> layers = registryPullerFuture.get();
+ vector<string> layers = registryPullerFuture.get();
ASSERT_EQ(1u, layers.size());
- ASSERT_EQ(layers.front().first,
+ ASSERT_EQ(layers.front(),
"1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
- Try<string> blob = os::read(
- path::join(layers.front().second, "rootfs", blobFile));
+ Try<string> blob = os::read(path::join(
+ registryPullerPath,
+ layers.front(),
+ "rootfs",
+ blobFile));
+
ASSERT_SOME(blob);
ASSERT_EQ(blob.get(), blobResponse);
}
@@ -1154,16 +1158,16 @@ public:
MOCK_METHOD2(
pull,
- Future<list<pair<string, string>>>(
+ Future<vector<string>>(
const spec::ImageReference&,
- const Path&));
+ const string&));
- Future<list<pair<string, string>>> unmocked_pull(
- const spec::ImageReference& name,
- const Path& directory)
+ Future<vector<string>> unmocked_pull(
+ const spec::ImageReference& reference,
+ const string& directory)
{
// TODO(gilbert): Allow return list to be overridden.
- return list<pair<string, string>>();
+ return vector<string>();
}
};
@@ -1184,10 +1188,12 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
MockPuller* puller = new MockPuller();
Future<Nothing> pull;
- Promise<list<pair<string, string>>> promise;
+ Future<string> directory;
+ Promise<vector<string>> promise;
EXPECT_CALL(*puller, pull(_, _))
.WillOnce(testing::DoAll(FutureSatisfy(&pull),
+ FutureArg<1>(&directory),
Return(promise.future())));
Try<Owned<slave::Store>> store =
@@ -1200,11 +1206,12 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
Future<slave::ImageInfo> imageInfo1 = store.get()->get(mesosImage);
AWAIT_READY(pull);
+ AWAIT_READY(directory);
// TODO(gilbert): Need a helper method to create test layers
// which will allow us to set manifest so that we can add
// checks here.
- const string layerPath = path::join(os::getcwd(), "456");
+ const string layerPath = path::join(directory.get(), "456");
Try<Nothing> mkdir = os::mkdir(layerPath);
ASSERT_SOME(mkdir);
@@ -1220,7 +1227,7 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
ASSERT_TRUE(imageInfo1.isPending());
Future<slave::ImageInfo> imageInfo2 = store.get()->get(mesosImage);
- const list<pair<string, string>> result = {{"456", layerPath}};
+ const vector<string> result = {"456"};
ASSERT_TRUE(imageInfo2.isPending());
promise.set(result);
[3/3] mesos git commit: Removed the restriction that /tmp needs to be
writable in new rootfs.
Posted by ji...@apache.org.
Removed the restriction that /tmp needs to be writable in new rootfs.
Review: https://reviews.apache.org/r/43896
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14f070fd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14f070fd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14f070fd
Branch: refs/heads/master
Commit: 14f070fda25c98c0a8ba29da84c607f2dd86da6a
Parents: 66d0f44
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Feb 23 10:49:11 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 23 15:08:08 2016 -0800
----------------------------------------------------------------------
src/linux/fs.cpp | 33 +++++++++++++++++++++------------
1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/14f070fd/src/linux/fs.cpp
----------------------------------------------------------------------
diff --git a/src/linux/fs.cpp b/src/linux/fs.cpp
index 0df1942..7792f68 100644
--- a/src/linux/fs.cpp
+++ b/src/linux/fs.cpp
@@ -584,21 +584,25 @@ Try<Nothing> enter(const string& root)
return Error("Failed to create devices: " + create.error());
}
- // Create a /tmp directory if it doesn't exist.
- // TODO(idownes): Consider mounting a tmpfs to /tmp.
+ // Prepare /tmp in the new root. Note that we cannot assume that the
+ // new root is writable (i.e., it could be a read only filesystem).
+ // Therefore, we always mount a tmpfs on /tmp in the new root so
+ // that we can create the mount point for the old root.
if (!os::exists(path::join(root, "tmp"))) {
- Try<Nothing> mkdir = os::mkdir(path::join(root, "tmp"));
- if (mkdir.isError()) {
- return Error("Failed to create /tmp in chroot: " + mkdir.error());
- }
+ return Error("/tmp in chroot does not exist");
+ }
- Try<Nothing> chmod = os::chmod(
- path::join(root, "tmp"),
- S_IRWXU | S_IRWXG | S_IRWXO | S_ISVTX);
+ // TODO(jieyu): Consider limiting the size of the tmpfs.
+ mount = fs::mount(
+ "tmpfs",
+ path::join(root, "tmp"),
+ "tmpfs",
+ MS_NOSUID | MS_NOEXEC | MS_NODEV,
+ "mode=1777");
- if (chmod.isError()) {
- return Error("Failed to set mode on /tmp: " + chmod.error());
- }
+ if (mount.isError()) {
+ return Error("Failed to mount the temporary tmpfs at /tmp in new root: " +
+ mount.error());
}
// Create a mount point for the old root.
@@ -661,6 +665,11 @@ Try<Nothing> enter(const string& root)
// Check status when we stop using lazy umounts.
os::rmdir(relativeOld);
+ Try<Nothing> unmount = fs::unmount("/tmp");
+ if (unmount.isError()) {
+ return Error("Failed to umount /tmp in the chroot: " + unmount.error());
+ }
+
return Nothing();
}
[2/3] mesos git commit: Used uri::Fetcher to pull docker images in
docker registry puller.
Posted by ji...@apache.org.
Used uri::Fetcher to pull docker images in docker registry puller.
Review: https://reviews.apache.org/r/43860
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/66d0f449
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/66d0f449
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/66d0f449
Branch: refs/heads/master
Commit: 66d0f4496f146fb0c68d23e08d25a9af108c6218
Parents: caf6c02
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Feb 22 16:52:01 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Feb 23 15:08:08 2016 -0800
----------------------------------------------------------------------
docs/configuration.md | 17 -
docs/endpoints/slave/state.json.md | 2 -
docs/endpoints/slave/state.md | 2 -
.../mesos/provisioner/docker/puller.cpp | 7 +-
.../mesos/provisioner/docker/puller.hpp | 7 +-
.../provisioner/docker/registry_puller.cpp | 319 +++---
.../provisioner/docker/registry_puller.hpp | 7 +-
.../mesos/provisioner/docker/store.cpp | 11 +-
src/slave/flags.cpp | 10 -
src/slave/flags.hpp | 2 -
src/slave/http.cpp | 2 -
.../containerizer/provisioner_docker_tests.cpp | 978 +------------------
12 files changed, 220 insertions(+), 1144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index b04e873..2353e78 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1057,15 +1057,6 @@ containerizer.
</tr>
<tr>
<td>
- --docker_auth_server=VALUE
- </td>
- <td>
-Docker authentication server used to authenticate with Docker registry
-(default: https://auth.docker.io)
- </td>
-</tr>
-<tr>
- <td>
--[no-]docker_kill_orphans
</td>
<td>
@@ -1090,14 +1081,6 @@ recovers.
</tr>
<tr>
<td>
- --docker_puller_timeout=VALUE
- </td>
- <td>
-Timeout in seconds for pulling images from the Docker registry (default: 60secs)
- </td>
-</tr>
-<tr>
- <td>
--docker_registry=VALUE
</td>
<td>
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/docs/endpoints/slave/state.json.md
----------------------------------------------------------------------
diff --git a/docs/endpoints/slave/state.json.md b/docs/endpoints/slave/state.json.md
index 0a31159..a6403ee 100644
--- a/docs/endpoints/slave/state.json.md
+++ b/docs/endpoints/slave/state.json.md
@@ -80,12 +80,10 @@ Example (**Note**: this is not exhaustive):
"work_dir" : "/tmp/mesos",
"launcher_dir" : "/path/to/mesos/build/src",
"registration_backoff_factor" : "1secs",
- "docker_auth_server" : "https://auth.docker.io",
"oversubscribed_resources_interval" : "15secs",
"enforce_container_disk_quota" : "false",
"container_disk_watch_interval" : "15secs",
"disk_watch_interval" : "1mins",
- "docker_puller_timeout" : "60",
"cgroups_limit_swap" : "false",
"hostname_lookup" : "true",
"perf_duration" : "10secs",
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/docs/endpoints/slave/state.md
----------------------------------------------------------------------
diff --git a/docs/endpoints/slave/state.md b/docs/endpoints/slave/state.md
index 1077f69..bdd555e 100644
--- a/docs/endpoints/slave/state.md
+++ b/docs/endpoints/slave/state.md
@@ -80,12 +80,10 @@ Example (**Note**: this is not exhaustive):
"work_dir" : "/tmp/mesos",
"launcher_dir" : "/path/to/mesos/build/src",
"registration_backoff_factor" : "1secs",
- "docker_auth_server" : "https://auth.docker.io",
"oversubscribed_resources_interval" : "15secs",
"enforce_container_disk_quota" : "false",
"container_disk_watch_interval" : "15secs",
"disk_watch_interval" : "1mins",
- "docker_puller_timeout" : "60",
"cgroups_limit_swap" : "false",
"hostname_lookup" : "true",
"perf_duration" : "10secs",
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
index d012ae4..ac9dae8 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.cpp
@@ -22,13 +22,16 @@
#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
using process::Owned;
+using process::Shared;
namespace mesos {
namespace internal {
namespace slave {
namespace docker {
-Try<Owned<Puller>> Puller::create(const Flags& flags)
+Try<Owned<Puller>> Puller::create(
+ const Flags& flags,
+ const Shared<uri::Fetcher>& fetcher)
{
// TODO(tnachen): Support multiple registries in the puller.
if (strings::startsWith(flags.docker_registry, "/")) {
@@ -40,7 +43,7 @@ Try<Owned<Puller>> Puller::create(const Flags& flags)
return puller.get();
}
- Try<Owned<Puller>> puller = RegistryPuller::create(flags);
+ Try<Owned<Puller>> puller = RegistryPuller::create(flags, fetcher);
if (puller.isError()) {
return Error("Failed to create registry puller: " + puller.error());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
index 51894dd..191f3fc 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/puller.hpp
@@ -24,9 +24,12 @@
#include <process/future.hpp>
#include <process/owned.hpp>
+#include <process/shared.hpp>
#include <mesos/docker/spec.hpp>
+#include <mesos/uri/fetcher.hpp>
+
#include "slave/flags.hpp"
namespace mesos {
@@ -37,7 +40,9 @@ namespace docker {
class Puller
{
public:
- static Try<process::Owned<Puller>> create(const Flags& flags);
+ static Try<process::Owned<Puller>> create(
+ const Flags& flags,
+ const process::Shared<uri::Fetcher>& fetcher);
virtual ~Puller() {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
index d09e2ec..e08a959 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.cpp
@@ -14,17 +14,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <glog/logging.h>
+
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
+#include <process/http.hpp>
+#include <stout/os/exists.hpp>
#include <stout/os/mkdir.hpp>
#include <stout/os/rm.hpp>
+#include <stout/os/write.hpp>
#include "common/command_utils.hpp"
+#include "uri/schemes/docker.hpp"
+
#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
namespace http = process::http;
@@ -38,68 +44,77 @@ using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
-using process::Promise;
+using process::Shared;
+
+using process::defer;
+using process::dispatch;
+using process::spawn;
+using process::wait;
namespace mesos {
namespace internal {
namespace slave {
namespace docker {
-using RegistryClient = registry::RegistryClient;
-
-
class RegistryPullerProcess : public Process<RegistryPullerProcess>
{
public:
- static Try<Owned<RegistryPullerProcess>> create(const Flags& flags);
+ RegistryPullerProcess(
+ const string& _storeDir,
+ const http::URL& _defaultRegistryUrl,
+ const Shared<uri::Fetcher>& _fetcher);
- process::Future<vector<string>> pull(
+ Future<vector<string>> pull(
const spec::ImageReference& reference,
const string& directory);
private:
- RegistryPullerProcess(
- const Owned<RegistryClient>& registry,
- const Duration& timeout);
-
- Future<vector<string>> downloadLayers(
- const spec::v2::ImageManifest& manifest,
+ Future<vector<string>> _pull(
const spec::ImageReference& reference,
const string& directory);
- Future<Nothing> downloadLayer(
+ Future<Nothing> fetchLayer(
const spec::ImageReference& reference,
const string& directory,
const string& blobSum,
- const string& layerId);
+ const spec::v1::ImageManifest& v1);
- Future<vector<string>> untarLayers(
+ Future<Nothing> _fetchLayer(
const string& directory,
- const vector<string>& layerIds);
-
- Future<Nothing> untarLayer(
- const string& directory,
- const string& layerId);
-
- Owned<RegistryClient> registryClient_;
- const Duration pullTimeout_;
- hashmap<string, Owned<Promise<Nothing>>> downloadTracker_;
+ const string& blobSum,
+ const spec::v1::ImageManifest& v1);
RegistryPullerProcess(const RegistryPullerProcess&) = delete;
RegistryPullerProcess& operator=(const RegistryPullerProcess&) = delete;
+
+ const string storeDir;
+
+ // If the user does not specify the registry url in the image
+ // reference, this registry url will be used as the default.
+ const http::URL defaultRegistryUrl;
+
+ Shared<uri::Fetcher> fetcher;
};
-Try<Owned<Puller>> RegistryPuller::create(const Flags& flags)
+Try<Owned<Puller>> RegistryPuller::create(
+ const Flags& flags,
+ const Shared<uri::Fetcher>& fetcher)
{
- Try<Owned<RegistryPullerProcess>> process =
- RegistryPullerProcess::create(flags);
-
- if (process.isError()) {
- return Error(process.error());
+ Try<http::URL> defaultRegistryUrl = http::URL::parse(flags.docker_registry);
+ if (defaultRegistryUrl.isError()) {
+ return Error(
+ "Failed to parse the default Docker registry: " +
+ defaultRegistryUrl.error());
}
- return Owned<Puller>(new RegistryPuller(process.get()));
+ Owned<RegistryPullerProcess> process(
+ new RegistryPullerProcess(
+ flags.docker_store_dir,
+ defaultRegistryUrl.get(),
+ fetcher));
+
+ return Owned<Puller>(new RegistryPuller(process));
}
@@ -113,7 +128,7 @@ RegistryPuller::RegistryPuller(Owned<RegistryPullerProcess> _process)
RegistryPuller::~RegistryPuller()
{
terminate(process.get());
- process::wait(process.get());
+ wait(process.get());
}
@@ -129,177 +144,159 @@ Future<vector<string>> RegistryPuller::pull(
}
-Try<Owned<RegistryPullerProcess>> RegistryPullerProcess::create(
- const Flags& flags)
-{
- Result<double> timeoutSecs = numify<double>(flags.docker_puller_timeout_secs);
- if ((timeoutSecs.isError()) || (timeoutSecs.get() <= 0)) {
- return Error(
- "Failed to create registry puller - invalid timeout value: " +
- flags.docker_puller_timeout_secs);
- }
-
- Try<http::URL> registryUrl = http::URL::parse(flags.docker_registry);
- if (registryUrl.isError()) {
- return Error("Failed to parse Docker registry: " + registryUrl.error());
- }
-
- Try<http::URL> authServerUrl = http::URL::parse(flags.docker_auth_server);
- if (authServerUrl.isError()) {
- return Error("Failed to parse Docker auth server: " +
- authServerUrl.error());
- }
-
- Try<Owned<RegistryClient>> registry = RegistryClient::create(
- registryUrl.get(), authServerUrl.get());
-
- if (registry.isError()) {
- return Error("Failed to create registry client: " + registry.error());
- }
-
- return Owned<RegistryPullerProcess>(new RegistryPullerProcess(
- registry.get(),
- Seconds(timeoutSecs.get())));
-}
-
-
RegistryPullerProcess::RegistryPullerProcess(
- const Owned<RegistryClient>& registry,
- const Duration& timeout)
- : registryClient_(registry),
- pullTimeout_(timeout) {}
+ const string& _storeDir,
+ const http::URL& _defaultRegistryUrl,
+ const Shared<uri::Fetcher>& _fetcher)
+ : storeDir(_storeDir),
+ defaultRegistryUrl(_defaultRegistryUrl),
+ fetcher(_fetcher) {}
Future<vector<string>> RegistryPullerProcess::pull(
const spec::ImageReference& reference,
const string& directory)
{
- // TODO(jojy): Have one outgoing manifest request per image.
- return registryClient_->getManifest(reference)
- .then(process::defer(self(), [=](const spec::v2::ImageManifest& manifest) {
- return downloadLayers(manifest, reference, directory);
- }))
- .then(process::defer(self(), [=](const vector<string>& layerIds) {
- return untarLayers(directory, layerIds);
- }))
- .after(pullTimeout_, [reference](Future<vector<string>> future) {
- future.discard();
- return Failure("Timed out");
- });
+ // TODO(jieyu): Consider introducing a 'normalize' function to
+ // normalize 'reference' here. For instance, we need to add
+ // 'library/' prefix if the user does not specify a repository.
+ // Also consider merging the registry generation logic below into
+ // 'normalize'.
+
+ URI manifestUri;
+ if (reference.has_registry()) {
+ // TODO(jieyu): The user specified registry might contain port. We
+ // need to parse it and set the 'scheme' and 'port' accordingly.
+ manifestUri = uri::docker::manifest(
+ reference.repository(),
+ (reference.has_tag() ? reference.tag() : "latest"),
+ reference.registry());
+ } else {
+ const string registry = defaultRegistryUrl.domain.isSome()
+ ? defaultRegistryUrl.domain.get()
+ : stringify(defaultRegistryUrl.ip.get());
+
+ const Option<int> port = defaultRegistryUrl.port.isSome()
+ ? static_cast<int>(defaultRegistryUrl.port.get())
+ : Option<int>();
+
+ manifestUri = uri::docker::manifest(
+ reference.repository(),
+ (reference.has_tag() ? reference.tag() : "latest"),
+ registry,
+ defaultRegistryUrl.scheme,
+ port);
+ }
+
+ return fetcher->fetch(manifestUri, directory)
+ .then(defer(self(), &Self::_pull, reference, directory));
}
-Future<vector<string>> RegistryPullerProcess::downloadLayers(
- const spec::v2::ImageManifest& manifest,
+Future<vector<string>> RegistryPullerProcess::_pull(
const spec::ImageReference& reference,
const string& directory)
{
+ Try<string> _manifest = os::read(path::join(directory, "manifest"));
+ if (_manifest.isError()) {
+ return Failure("Failed to read the manifest: " + _manifest.error());
+ }
+
+ Try<spec::v2::ImageManifest> manifest = spec::v2::parse(_manifest.get());
+ if (manifest.isError()) {
+ return Failure("Failed to parse the manifest: " + manifest.error());
+ }
+
list<Future<Nothing>> futures;
vector<string> layerIds;
- CHECK_EQ(manifest.fslayers_size(), manifest.history_size());
+ CHECK_EQ(manifest->fslayers_size(), manifest->history_size());
- for (int i = 0; i < manifest.fslayers_size(); i++) {
- CHECK(manifest.history(i).has_v1());
+ for (int i = 0; i < manifest->fslayers_size(); i++) {
+ CHECK(manifest->history(i).has_v1());
- layerIds.push_back(manifest.history(i).v1().id());
+ layerIds.push_back(manifest->history(i).v1().id());
- futures.push_back(downloadLayer(
+ futures.push_back(fetchLayer(
reference,
directory,
- manifest.fslayers(i).blobsum(),
- manifest.history(i).v1().id()));
+ manifest->fslayers(i).blobsum(),
+ manifest->history(i).v1()));
}
- // TODO(jojy): Delete downloaded files in the directory on discard and
- // failure?
- // TODO(jojy): Iterate through the futures and log the failed future.
return collect(futures)
.then([layerIds]() { return layerIds; });
}
-Future<Nothing> RegistryPullerProcess::downloadLayer(
+Future<Nothing> RegistryPullerProcess::fetchLayer(
const spec::ImageReference& reference,
const string& directory,
const string& blobSum,
- const string& layerId)
+ const spec::v1::ImageManifest& v1)
{
- VLOG(1) << "Downloading layer '" << layerId
- << "' for image '" << stringify(reference) << "'";
-
- if (downloadTracker_.contains(layerId)) {
- VLOG(1) << "Download already in progress for image '"
- << stringify(reference) << "', layer '" << layerId << "'";
-
- return downloadTracker_.at(layerId)->future();
+ // Check if the layer is in the store or not. If yes, skip the
+ // unnecessary fetching.
+ if (os::exists(paths::getImageLayerPath(storeDir, v1.id()))) {
+ return Nothing();
}
- Owned<Promise<Nothing>> downloadPromise(new Promise<Nothing>());
-
- downloadTracker_.insert({layerId, downloadPromise});
-
- const Path downloadFile(path::join(directory, layerId + ".tar"));
-
- registryClient_->getBlob(
- reference,
- blobSum,
- downloadFile)
- .onAny(process::defer(
- self(),
- [this, layerId, downloadPromise, downloadFile](
- const Future<size_t>& future) {
- downloadTracker_.erase(layerId);
-
- if (!future.isReady()) {
- downloadPromise->fail(
- "Failed to download layer '" + layerId + "': " +
- (future.isFailed() ? future.failure() : "future discarded"));
- } else if (future.get() == 0) {
- // We don't expect Docker registry to return empty response
- // even with empty layers.
- downloadPromise->fail(
- "Failed to download layer '" + layerId + "': no content");
- } else {
- downloadPromise->set(Nothing());
- }
- }));
-
- return downloadPromise->future();
-}
-
-
-Future<vector<string>> RegistryPullerProcess::untarLayers(
- const string& directory,
- const vector<string>& layerIds)
-{
- list<Future<Nothing>> futures;
- foreach (const string& layerId, layerIds) {
- VLOG(1) << "Untarring layer '" << layerId
- << "' downloaded from registry to directory '"
- << directory << "'";
-
- futures.emplace_back(untarLayer(directory, layerId));
+ VLOG(1) << "Fetching layer '" << v1.id() << "' for image '"
+ << reference << "'";
+
+ URI blobUri;
+ if (reference.has_registry()) {
+ // TODO(jieyu): The user specified registry might contain port. We
+ // need to parse it and set the 'scheme' and 'port' accordingly.
+ blobUri = uri::docker::blob(
+ reference.repository(),
+ blobSum,
+ reference.registry());
+ } else {
+ const string registry = defaultRegistryUrl.domain.isSome()
+ ? defaultRegistryUrl.domain.get()
+ : stringify(defaultRegistryUrl.ip.get());
+
+ const Option<int> port = defaultRegistryUrl.port.isSome()
+ ? static_cast<int>(defaultRegistryUrl.port.get())
+ : Option<int>();
+
+ blobUri = uri::docker::blob(
+ reference.repository(),
+ blobSum,
+ registry,
+ defaultRegistryUrl.scheme,
+ port);
}
- return collect(futures)
- .then([layerIds]() { return layerIds; });
+ return fetcher->fetch(blobUri, directory)
+ .then(defer(self(), &Self::_fetchLayer, directory, blobSum, v1));
}
-Future<Nothing> RegistryPullerProcess::untarLayer(
+Future<Nothing> RegistryPullerProcess::_fetchLayer(
const string& directory,
- const string& layerId)
+ const string& blobSum,
+ const spec::v1::ImageManifest& v1)
{
- const string layerPath = path::join(directory, layerId);
- const string tar = path::join(directory, layerId + ".tar");
+ const string layerPath = path::join(directory, v1.id());
+ const string tar = path::join(directory, blobSum);
const string rootfs = paths::getImageLayerRootfsPath(layerPath);
+ const string manifest = paths::getImageLayerManifestPath(layerPath);
- Try<Nothing> mkdir = os::mkdir(rootfs);
+ // NOTE: This will create 'layerPath' as well.
+ Try<Nothing> mkdir = os::mkdir(rootfs, true);
if (mkdir.isError()) {
return Failure(
- "Failed to create directory '" + rootfs + "'"
- ": " + mkdir.error());
+ "Failed to create rootfs directory '" + rootfs + "' "
+ "for layer '" + v1.id() + "': " + mkdir.error());
+ }
+
+ Try<Nothing> write = os::write(manifest, stringify(JSON::protobuf(v1)));
+ if (write.isError()) {
+ return Failure(
+ "Failed to save the layer manifest for layer '" +
+ v1.id() + "': " + write.error());
}
return command::untar(Path(tar), Path(rootfs))
@@ -308,8 +305,8 @@ Future<Nothing> RegistryPullerProcess::untarLayer(
Try<Nothing> rm = os::rm(tar);
if (rm.isError()) {
return Failure(
- "Failed to remove '" + tar + "' "
- "after extraction: " + rm.error());
+ "Failed to remove '" + tar + "' "
+ "after extraction: " + rm.error());
}
return Nothing();
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
index c429df9..bbd6005 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/registry_puller.hpp
@@ -18,9 +18,12 @@
#define __PROVISIONER_DOCKER_REGISTRY_PULLER_HPP__
#include <process/owned.hpp>
+#include <process/shared.hpp>
#include <stout/try.hpp>
+#include <mesos/uri/fetcher.hpp>
+
#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
#include "slave/flags.hpp"
@@ -39,7 +42,9 @@ class RegistryPullerProcess;
class RegistryPuller : public Puller
{
public:
- static Try<process::Owned<Puller>> create(const Flags& flags);
+ static Try<process::Owned<Puller>> create(
+ const Flags& flags,
+ const process::Shared<uri::Fetcher>& fetcher);
~RegistryPuller();
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/containerizer/mesos/provisioner/docker/store.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
index d802a79..47d3dc1 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -34,6 +34,8 @@
#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
+#include "uri/fetcher.hpp"
+
using namespace process;
namespace spec = docker::spec;
@@ -89,7 +91,14 @@ private:
Try<Owned<slave::Store>> Store::create(const Flags& flags)
{
- Try<Owned<Puller>> puller = Puller::create(flags);
+ // TODO(jieyu): We should inject URI fetcher from top level, instead
+ // of creating it here.
+ Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create();
+ if (fetcher.isError()) {
+ return Error("Failed to create the URI fetcher: " + fetcher.error());
+ }
+
+ Try<Owned<Puller>> puller = Puller::create(flags, fetcher->share());
if (puller.isError()) {
return Error("Failed to create Docker puller: " + puller.error());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 855812e..1c6a87b 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -124,16 +124,6 @@ mesos::internal::slave::Flags::Flags()
"Directory the appc provisioner will store images in.\n",
"/tmp/mesos/store/appc");
- add(&Flags::docker_auth_server,
- "docker_auth_server",
- "Docker authentication server used to authenticate with Docker registry",
- "https://auth.docker.io");
-
- add(&Flags::docker_puller_timeout_secs,
- "docker_puller_timeout",
- "Timeout in seconds for pulling images from the Docker registry",
- "60");
-
add(&Flags::docker_registry,
"docker_registry",
"The default url for pulling Docker images. It could either be a Docker\n"
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 54c1a69..c079321 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -53,8 +53,6 @@ public:
std::string appc_simple_discovery_uri_prefix;
std::string appc_store_dir;
- std::string docker_auth_server;
- std::string docker_puller_timeout_secs;
std::string docker_registry;
std::string docker_store_dir;
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index a18085e..4eb1faf 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -465,12 +465,10 @@ string Slave::Http::STATE_HELP() {
" \"work_dir\" : \"/tmp/mesos\",",
" \"launcher_dir\" : \"/path/to/mesos/build/src\",",
" \"registration_backoff_factor\" : \"1secs\",",
- " \"docker_auth_server\" : \"https://auth.docker.io\",",
" \"oversubscribed_resources_interval\" : \"15secs\",",
" \"enforce_container_disk_quota\" : \"false\",",
" \"container_disk_watch_interval\" : \"15secs\",",
" \"disk_watch_interval\" : \"1mins\",",
- " \"docker_puller_timeout\" : \"60\",",
" \"cgroups_limit_swap\" : \"false\",",
" \"hostname_lookup\" : \"true\",",
" \"perf_duration\" : \"10secs\",",
http://git-wip-us.apache.org/repos/asf/mesos/blob/66d0f449/src/tests/containerizer/provisioner_docker_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer/provisioner_docker_tests.cpp b/src/tests/containerizer/provisioner_docker_tests.cpp
index c6b1e69..b3c6f88 100644
--- a/src/tests/containerizer/provisioner_docker_tests.cpp
+++ b/src/tests/containerizer/provisioner_docker_tests.cpp
@@ -14,984 +14,50 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include <utility>
-
#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <stout/duration.hpp>
-
#include <stout/gtest.hpp>
#include <stout/json.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/stringify.hpp>
-#include <process/address.hpp>
-#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
-#include <process/io.hpp>
#include <process/owned.hpp>
-#include <process/socket.hpp>
-#include <process/subprocess.hpp>
-
-#include <process/ssl/gtest.hpp>
#include <mesos/docker/spec.hpp>
#include "slave/containerizer/mesos/provisioner/docker/metadata_manager.hpp"
#include "slave/containerizer/mesos/provisioner/docker/paths.hpp"
#include "slave/containerizer/mesos/provisioner/docker/puller.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/registry_client.hpp"
#include "slave/containerizer/mesos/provisioner/docker/registry_puller.hpp"
#include "slave/containerizer/mesos/provisioner/docker/store.hpp"
-#include "slave/containerizer/mesos/provisioner/docker/token_manager.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
-namespace io = process::io;
+namespace paths = mesos::internal::slave::docker::paths;
namespace slave = mesos::internal::slave;
namespace spec = ::docker::spec;
-using std::list;
-using std::map;
-using std::pair;
using std::string;
using std::vector;
-using process::Clock;
using process::Future;
using process::Owned;
using process::Promise;
-using process::Subprocess;
-using process::network::Socket;
+using slave::ImageInfo;
using slave::docker::Puller;
using slave::docker::RegistryPuller;
-
-using slave::docker::paths::getImageLayerRootfsPath;
-
-using slave::docker::registry::RegistryClient;
-using slave::docker::registry::Token;
-using slave::docker::registry::TokenManager;
+using slave::docker::Store;
namespace mesos {
namespace internal {
namespace tests {
-/**
- * Provides token operations and defaults.
- */
-class TokenHelper {
-protected:
- const string hdrBase64 = base64::encode(
- "{ \
- \"alg\":\"ES256\", \
- \"typ\":\"JWT\", \
- \"x5c\":[\"test\"] \
- }");
-
- string getClaimsBase64() const
- {
- return base64::encode(claimsJsonString);
- }
-
- string getTokenString() const
- {
- return hdrBase64 + "." + getClaimsBase64() + "." + signBase64;
- }
-
- string getDefaultTokenString()
- {
- // Construct response and send(server side).
- const double expirySecs = Clock::now().secs() + Days(365).secs();
-
- claimsJsonString =
- "{\"access\" \
- :[ \
- { \
- \"type\":\"repository\", \
- \"name\":\"library/busybox\", \
- \"actions\":[\"pull\"]}], \
- \"aud\":\"registry.docker.io\", \
- \"exp\":" + stringify(expirySecs) + ", \
- \"iat\":1438887168, \
- \"iss\":\"auth.docker.io\", \
- \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
- \"nbf\":1438887166, \
- \"sub\":\"\" \
- }";
-
- return getTokenString();
- }
-
- const string signBase64 = base64::encode("{\"\"}");
- string claimsJsonString;
-};
-
-
-/**
- * Fixture for testing TokenManager component.
- */
-class RegistryTokenTest : public TokenHelper, public ::testing::Test {};
-
-
-// Tests JSON Web Token parsing for a valid token string.
-TEST_F(RegistryTokenTest, ValidToken)
-{
- const double expirySecs = Clock::now().secs() + Days(365).secs();
-
- claimsJsonString =
- "{\"access\" \
- :[ \
- { \
- \"type\":\"repository\", \
- \"name\":\"library/busybox\", \
- \"actions\":[\"pull\"]}], \
- \"aud\":\"registry.docker.io\", \
- \"exp\":" + stringify(expirySecs) + ", \
- \"iat\":1438887168, \
- \"iss\":\"auth.docker.io\", \
- \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
- \"nbf\":1438887166, \
- \"sub\":\"\" \
- }";
-
- Try<Token> token = Token::create(getTokenString());
-
- ASSERT_SOME(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with expiration
-// date in the past.
-TEST_F(RegistryTokenTest, ExpiredToken)
-{
- // Use an arbitrary fixed date that is far in the past (12 weeks
- // after the Unix epoch).
- const double expirySecs = Weeks(12).secs();
-
- claimsJsonString =
- "{\"access\" \
- :[ \
- { \
- \"type\":\"repository\", \
- \"name\":\"library/busybox\", \
- \"actions\":[\"pull\"]}], \
- \"aud\":\"registry.docker.io\", \
- \"exp\":" + stringify(expirySecs) + ", \
- \"iat\":1438887166, \
- \"iss\":\"auth.docker.io\", \
- \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
- \"nbf\":1438887166, \
- \"sub\":\"\" \
- }";
-
- Try<Token> token = Token::create(getTokenString());
-
- EXPECT_ERROR(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with no expiration date.
-TEST_F(RegistryTokenTest, NoExpiration)
-{
- claimsJsonString =
- "{\"access\" \
- :[ \
- { \
- \"type\":\"repository\", \
- \"name\":\"library/busybox\", \
- \"actions\":[\"pull\"]}], \
- \"aud\":\"registry.docker.io\", \
- \"iat\":1438887166, \
- \"iss\":\"auth.docker.io\", \
- \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
- \"nbf\":1438887166, \
- \"sub\":\"\" \
- }";
-
- const Try<Token> token = Token::create(getTokenString());
-
- ASSERT_SOME(token);
-}
-
-
-// Tests JSON Web Token parsing for a token string with not-before date in the
-// future.
-TEST_F(RegistryTokenTest, NotBeforeInFuture)
-{
- const double expirySecs = Clock::now().secs() + Days(365).secs();
- const double nbfSecs = Clock::now().secs() + Days(7).secs();
-
- claimsJsonString =
- "{\"access\" \
- :[ \
- { \
- \"type\":\"repository\", \
- \"name\":\"library/busybox\", \
- \"actions\":[\"pull\"]}], \
- \"aud\":\"registry.docker.io\", \
- \"exp\":" + stringify(expirySecs) + ", \
- \"iat\":1438887166, \
- \"iss\":\"auth.docker.io\", \
- \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
- \"nbf\":" + stringify(nbfSecs) + ", \
- \"sub\":\"\" \
- }";
-
- const Try<Token> token = Token::create(getTokenString());
-
- ASSERT_SOME(token);
- ASSERT_EQ(token.get().isValid(), false);
-}
-
-
-#ifdef USE_SSL_SOCKET
-
-// Test suite for docker registry tests.
-class RegistryClientTest : public virtual SSLTest, public TokenHelper
-{
-protected:
- RegistryClientTest() {}
-
- Try<Socket> getServer() {
- return setup_server({
- {"SSL_ENABLED", "true"},
- {"SSL_KEY_FILE", key_path().value},
- {"SSL_CERT_FILE", certificate_path().value}});
- }
-};
-
-
-// Tests TokenManager for a simple token request.
-TEST_F(RegistryClientTest, SimpleGetToken)
-{
- Try<Socket> server = getServer();
-
- ASSERT_SOME(server);
- ASSERT_SOME(server.get().address());
- ASSERT_SOME(server.get().address().get().hostname());
-
- Future<Socket> socket = server.get().accept();
-
- // Create URL from server hostname and port.
- const process::http::URL url(
- "https",
- server.get().address().get().hostname().get(),
- server.get().address().get().port);
-
- Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
- ASSERT_SOME(tokenMgr);
-
- Future<Token> token =
- tokenMgr.get()->getToken(
- "registry.docker.io",
- "repository:library/busybox:pull",
- None());
-
- AWAIT_ASSERT_READY(socket);
-
- // Construct response and send(server side).
- const double expirySecs = Clock::now().secs() + Days(365).secs();
-
- claimsJsonString =
- "{\"access\" \
- :[ \
- { \
- \"type\":\"repository\", \
- \"name\":\"library/busybox\", \
- \"actions\":[\"pull\"]}], \
- \"aud\":\"registry.docker.io\", \
- \"exp\":" + stringify(expirySecs) + ", \
- \"iat\":1438887168, \
- \"iss\":\"auth.docker.io\", \
- \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
- \"nbf\":1438887166, \
- \"sub\":\"\" \
- }";
-
- const string tokenString(getTokenString());
- const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
-
- const string buffer =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(tokenResponse.length()) + "\r\n" +
- "\r\n" +
- tokenResponse;
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
-
- AWAIT_ASSERT_READY(token);
- ASSERT_EQ(token.get().raw, tokenString);
-}
-
-
-// Tests TokenManager for bad token response from server.
-TEST_F(RegistryClientTest, BadTokenResponse)
-{
- Try<Socket> server = getServer();
-
- ASSERT_SOME(server);
- ASSERT_SOME(server.get().address());
- ASSERT_SOME(server.get().address().get().hostname());
-
- Future<Socket> socket = server.get().accept();
-
- // Create URL from server hostname and port.
- const process::http::URL url(
- "https",
- server.get().address().get().hostname().get(),
- server.get().address().get().port);
-
- Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
- ASSERT_SOME(tokenMgr);
-
- Future<Token> token =
- tokenMgr.get()->getToken(
- "registry.docker.io",
- "repository:library/busybox:pull",
- None());
-
- AWAIT_ASSERT_READY(socket);
-
- const string tokenString("bad token");
- const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
-
- const string buffer =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(tokenResponse.length()) + "\r\n" +
- "\r\n" +
- tokenResponse;
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
-
- AWAIT_FAILED(token);
-}
-
-
-// Tests TokenManager for request to invalid server.
-TEST_F(RegistryClientTest, DISABLED_BadTokenServerAddress)
-{
- // Create an invalid URL with current time.
- const process::http::URL url("https", stringify(Clock::now().secs()), 0);
-
- Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
- ASSERT_SOME(tokenMgr);
-
- Future<Token> token =
- tokenMgr.get()->getToken(
- "registry.docker.io",
- "repository:library/busybox:pull",
- None());
-
- AWAIT_FAILED(token);
-}
-
-
-// Tests docker registry's getManifest API.
-TEST_F(RegistryClientTest, SimpleGetManifest)
-{
- Try<Socket> server = getServer();
-
- ASSERT_SOME(server);
- ASSERT_SOME(server.get().address());
- ASSERT_SOME(server.get().address().get().hostname());
-
- Future<Socket> socket = server.get().accept();
-
- const process::http::URL url(
- "https",
- server.get().address().get().hostname().get(),
- server.get().address().get().port);
-
- Try<Owned<RegistryClient>> registryClient =
- RegistryClient::create(url, url, None());
-
- ASSERT_SOME(registryClient);
-
- Try<spec::ImageReference> reference =
- spec::parseImageReference("library/busybox");
- ASSERT_SOME(reference);
-
- Future<spec::v2::ImageManifest> manifestResponse =
- registryClient.get()->getManifest(reference.get());
-
- const string unauthResponseHeaders = "Www-Authenticate: Bearer"
- " realm=\"https://auth.docker.io/token\","
- "service=" + stringify(server.get().address().get()) + ","
- "scope=\"repository:library/busybox:pull\"";
-
- const string unauthHttpResponse =
- string("HTTP/1.1 401 Unauthorized\r\n") +
- unauthResponseHeaders + "\r\n" +
- "\r\n";
-
- AWAIT_ASSERT_READY(socket);
-
- // Send 401 Unauthorized response for a manifest request.
- Future<string> manifestHttpRequest = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(manifestHttpRequest);
- AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
- // Token response.
- socket = server.get().accept();
- AWAIT_ASSERT_READY(socket);
-
- Future<string> tokenRequest = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(tokenRequest);
-
- const string tokenResponse =
- "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
- const string tokenHttpResponse =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(tokenResponse.length()) + "\r\n" +
- "\r\n" +
- tokenResponse;
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
- // Manifest response.
- socket = server.get().accept();
- AWAIT_ASSERT_READY(socket);
-
- manifestHttpRequest = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(manifestHttpRequest);
-
- const string manifestJSON =
- "{"
- " \"schemaVersion\": 1,"
- " \"name\": \"library/busybox\","
- " \"tag\": \"latest\","
- " \"architecture\": \"amd64\","
- " \"fsLayers\": ["
- " {"
- " \"blobSum\": "
- "\"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\""
- " },"
- " {"
- " \"blobSum\": "
- "\"sha256:1db09adb5ddd7f1a07b6d585a7db747a51c7bd17418d47e91f901bdf420abd66\""
- " },"
- " {"
- " \"blobSum\": "
- "\"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\""
- " }"
- " ],"
- " \"history\": ["
- " {"
- " \"v1Compatibility\": "
- " \"{"
- " \\\"id\\\": "
- "\\\"1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
- " \\\"parent\\\": "
- "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
- " }\""
- " },"
- " {"
- " \"v1Compatibility\": "
- " \"{"
- " \\\"id\\\": "
- "\\\"2ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
- " \\\"parent\\\": "
- "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
- " }\""
- " },"
- " {"
- " \"v1Compatibility\": "
- " \"{"
- " \\\"id\\\": "
- "\\\"3ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
- " \\\"parent\\\": "
- "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
- " }\""
- " }"
- " ],"
- " \"signatures\": ["
- " {"
- " \"header\": {"
- " \"jwk\": {"
- " \"crv\": \"P-256\","
- " \"kid\": "
- "\"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\","
- " \"kty\": \"EC\","
- " \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\","
- " \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\""
- " },"
- " \"alg\": \"ES256\""
- " },"
- " \"signature\": \"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgO"
- "EarAs7_4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\","
- " \"protected\": \"eyJmb3JtYXRMZW5ndGgiOjYwNjMsImZvcm1hdFRhaWwiOiJ"
- "DbjAiLCJ0aW1lIjoiMjAxNC0wOS0xMVQxNzoxNDozMFoifQ\""
- " }"
- " ]"
- "}";
-
- const string manifestHttpResponse =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(manifestJSON.length()) + "\r\n" +
- "Docker-Content-Digest: "
- "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
- + "\r\n" +
- "\r\n" +
- manifestJSON;
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
-
- AWAIT_ASSERT_READY(manifestResponse);
-
- EXPECT_EQ(
- manifestResponse.get().history(2).v1().id(),
- "1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-
- EXPECT_EQ(
- manifestResponse.get().history(1).v1().id(),
- "2ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-
- EXPECT_EQ(
- manifestResponse.get().history(0).v1().id(),
- "3ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-}
-
-
-// Tests docker registry's getBlob API.
-TEST_F(RegistryClientTest, SimpleGetBlob)
-{
- Try<Socket> server = getServer();
-
- ASSERT_SOME(server);
- ASSERT_SOME(server.get().address());
- ASSERT_SOME(server.get().address().get().hostname());
-
- Future<Socket> socket = server.get().accept();
-
- Try<Socket> blobServer = getServer();
-
- ASSERT_SOME(blobServer);
- ASSERT_SOME(blobServer.get().address());
- ASSERT_SOME(blobServer.get().address().get().hostname());
-
- Future<Socket> blobServerAcceptSocket = blobServer.get().accept();
-
- const process::http::URL url(
- "https",
- server.get().address().get().hostname().get(),
- server.get().address().get().port);
-
- Try<Owned<RegistryClient>> registryClient =
- RegistryClient::create(url, url, None());
-
- ASSERT_SOME(registryClient);
-
- const Path blobPath(path::join(os::getcwd(), "blob"));
-
- Try<spec::ImageReference> reference = spec::parseImageReference("blob");
- ASSERT_SOME(reference);
-
- Future<size_t> result =
- registryClient.get()->getBlob(
- reference.get(),
- "digest",
- blobPath);
-
- const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
- " realm=\"https://auth.docker.io/token\","
- "service=" + stringify(server.get().address().get()) + ","
- "scope=\"repository:library/busybox:pull\"";
-
- const string unauthHttpResponse =
- string("HTTP/1.1 401 Unauthorized\r\n") +
- unauthResponseHeaders + "\r\n" +
- "\r\n";
-
- AWAIT_ASSERT_READY(socket);
-
- // Send 401 Unauthorized response.
- Future<string> blobHttpRequest = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(blobHttpRequest);
- AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
- // Send token response.
- socket = server.get().accept();
- AWAIT_ASSERT_READY(socket);
-
- Future<string> tokenRequest = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(tokenRequest);
-
- const string tokenResponse =
- "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
- const string tokenHttpResponse =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(tokenResponse.length()) + "\r\n" +
- "\r\n" +
- tokenResponse;
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
- // Send redirect.
- socket = server.get().accept();
- AWAIT_ASSERT_READY(socket);
-
- blobHttpRequest = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(blobHttpRequest);
-
- const string redirectHttpResponse =
- string("HTTP/1.1 307 Temporary Redirect\r\n") +
- "Location: https://" +
- blobServer.get().address().get().hostname().get() + ":" +
- stringify(blobServer.get().address().get().port) + "/blob \r\n" +
- "\r\n";
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
-
- // Finally send blob response.
- AWAIT_ASSERT_READY(blobServerAcceptSocket);
-
- blobHttpRequest = Socket(blobServerAcceptSocket.get()).recv();
- AWAIT_ASSERT_READY(blobHttpRequest);
-
- const string blobResponse = stringify(Clock::now());
-
- const string blobHttpResponse =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(blobResponse.length()) + "\r\n" +
- "\r\n" +
- blobResponse;
-
- AWAIT_ASSERT_READY(Socket(blobServerAcceptSocket.get()).send(
- blobHttpResponse));
-
- AWAIT_ASSERT_READY(result);
-
- Try<string> blob = os::read(blobPath);
- ASSERT_SOME(blob);
- ASSERT_EQ(blob.get(), blobResponse);
-}
-
-
-TEST_F(RegistryClientTest, BadRequest)
-{
- Try<Socket> server = getServer();
-
- ASSERT_SOME(server);
- ASSERT_SOME(server.get().address());
- ASSERT_SOME(server.get().address().get().hostname());
-
- Future<Socket> socket = server.get().accept();
-
- const process::http::URL url(
- "https",
- server.get().address().get().hostname().get(),
- server.get().address().get().port);
-
- Try<Owned<RegistryClient>> registryClient =
- RegistryClient::create(url, url, None());
-
- ASSERT_SOME(registryClient);
-
- const Path blobPath(path::join(os::getcwd(), "blob"));
-
- Try<spec::ImageReference> reference = spec::parseImageReference("blob");
- ASSERT_SOME(reference);
-
- Future<size_t> result =
- registryClient.get()->getBlob(
- reference.get(),
- "digest",
- blobPath);
-
- const string badRequestResponse =
- "{\"errors\": [{\"message\": \"Error1\" }, {\"message\": \"Error2\"}]}";
-
- const string badRequestHttpResponse =
- string("HTTP/1.1 400 Bad Request\r\n") +
- "Content-Length : " + stringify(badRequestResponse.length()) + "\r\n" +
- "\r\n" +
- badRequestResponse;
-
- AWAIT_ASSERT_READY(socket);
-
- // Send 400 Bad Request.
- Future<string> blobHttpRequest = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(blobHttpRequest);
- AWAIT_ASSERT_READY(Socket(socket.get()).send(badRequestHttpResponse));
-
- AWAIT_FAILED(result);
-
- ASSERT_TRUE(strings::contains(result.failure(), "Error1"));
- ASSERT_TRUE(strings::contains(result.failure(), "Error2"));
-}
-
-
-// Tests docker RegistryPuller component. It simulates pulling an image layer
-// from remote registry and then verifies the content saved on disk.
-TEST_F(RegistryClientTest, SimpleRegistryPuller)
-{
- Try<Socket> server = getServer();
-
- ASSERT_SOME(server);
- ASSERT_SOME(server.get().address());
- ASSERT_SOME(server.get().address().get().hostname());
-
- Future<Socket> socket = server.get().accept();
-
- Try<Socket> blobServer = getServer();
-
- ASSERT_SOME(blobServer);
- ASSERT_SOME(blobServer.get().address());
- ASSERT_SOME(blobServer.get().address().get().hostname());
-
- Future<Socket> blobServerAcceptSock = blobServer.get().accept();
-
- slave::Flags flags;
- process::network::Address address = server.get().address().get();
- const string url = "https://" + address.hostname().get() + ":" +
- stringify(address.port);
- flags.docker_registry = url;
- flags.docker_auth_server = url;
-
- Try<Owned<Puller>> registryPuller = RegistryPuller::create(flags);
- ASSERT_SOME(registryPuller);
-
- const string registryPullerPath = os::getcwd();
-
- Try<spec::ImageReference> reference = spec::parseImageReference("busybox");
- ASSERT_SOME(reference);
-
- Future<vector<string>> registryPullerFuture =
- registryPuller.get()->pull(reference.get(), registryPullerPath);
-
- const string unauthResponseHeaders = "WWW-Authenticate: Bearer"
- " realm=\"https://auth.docker.io/token\","
- "service=" + stringify(server.get().address().get()) + ","
- "scope=\"repository:library/busybox:pull\"";
-
- const string unauthHttpResponse =
- string("HTTP/1.1 401 Unauthorized\r\n") +
- unauthResponseHeaders + "\r\n" +
- "\r\n";
-
- AWAIT_ASSERT_READY(socket);
-
- // Send 401 Unauthorized response for a manifest request.
- Future<string> registryPullerHttpRequestFuture = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
- AWAIT_ASSERT_READY(Socket(socket.get()).send(unauthHttpResponse));
-
- // Token response.
- socket = server.get().accept();
- AWAIT_ASSERT_READY(socket);
-
- Future<string> tokenRequestFuture = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(tokenRequestFuture);
-
- const string tokenResponse =
- "{\"token\":\"" + getDefaultTokenString() + "\"}";
-
- const string tokenHttpResponse =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(tokenResponse.length()) + "\r\n" +
- "\r\n" +
- tokenResponse;
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(tokenHttpResponse));
-
- // Manifest response.
- socket = server.get().accept();
- AWAIT_ASSERT_READY(socket);
-
- registryPullerHttpRequestFuture = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
-
- const string manifestResponse =
- "{"
- " \"schemaVersion\": 1,"
- " \"name\": \"library/busybox\","
- " \"tag\": \"latest\","
- " \"architecture\": \"amd64\","
- " \"fsLayers\": ["
- " {"
- " \"blobSum\": "
- "\"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4\""
- " }"
- " ],"
- " \"history\": ["
- " {"
- " \"v1Compatibility\": "
- " \"{"
- " \\\"id\\\": "
- "\\\"1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea\\\","
- " \\\"parent\\\": "
- "\\\"cf2616975b4a3cba083ca99bc3f0bf25f5f528c3c52be1596b30f60b0b1c37ff\\\""
- " }\""
- " }"
- " ],"
- " \"signatures\": ["
- " {"
- " \"header\": {"
- " \"jwk\": {"
- " \"crv\": \"P-256\","
- " \"kid\": "
- "\"OOI5:SI3T:LC7D:O7DX:FY6S:IAYW:WDRN:VQEM:BCFL:OIST:Q3LO:GTQQ\","
- " \"kty\": \"EC\","
- " \"x\": \"J2N5ePGhlblMI2cdsR6NrAG_xbNC_X7s1HRtk5GXvzM\","
- " \"y\": \"Idr-tEBjnNnfq6_71aeXBi3Z9ah_rrE209l4wiaohk0\""
- " },"
- " \"alg\": \"ES256\""
- " },"
- " \"signature\": \"65vq57TakC_yperuhfefF4uvTbKO2L45gYGDs5bIEgO"
- "EarAs7_4dbEV5u-W7uR8gF6EDKfowUCmTq3a5vEOJ3w\","
- " \"protected\": \"eyJmb3JtYXRMZW5ndGgiOjYwNjMsImZvcm1hdFRhaWwiOiJ"
- "DbjAiLCJ0aW1lIjoiMjAxNC0wOS0xMVQxNzoxNDozMFoifQ\""
- " }"
- " ]"
- "}";
-
- const string manifestHttpResponse =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-Length : " +
- stringify(manifestResponse.length()) + "\r\n" +
- "Docker-Content-Digest: "
- "sha256:df9e13f36d2d5b30c16bfbf2a6110c45ebed0bfa1ea42d357651bc6c736d5322"
- + "\r\n" +
- "\r\n" +
- manifestResponse;
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(manifestHttpResponse));
-
- // Redirect response.
- socket = server.get().accept();
- AWAIT_ASSERT_READY(socket);
-
- registryPullerHttpRequestFuture = Socket(socket.get()).recv();
- AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
-
- const string redirectHttpResponse =
- string("HTTP/1.1 307 Temporary Redirect\r\n") +
- "Content-Length : 0\r\n" +
- "Location: https://" +
- blobServer.get().address().get().hostname().get() + ":" +
- stringify(blobServer.get().address().get().port) + "/blob \r\n" +
- "\r\n";
-
- AWAIT_ASSERT_READY(Socket(socket.get()).send(redirectHttpResponse));
-
- AWAIT_ASSERT_READY(blobServerAcceptSock);
-
- registryPullerHttpRequestFuture = Socket(blobServerAcceptSock.get()).recv();
- AWAIT_ASSERT_READY(registryPullerHttpRequestFuture);
-
- // Prepare the blob response from the server. The blob response buffer is a
- // tarball. So we create a tarball of our test response and send that.
- const string blobFile = "blob";
- const string blobResponse = "hello docker";
-
- Path blobPath(path::join(registryPullerPath, blobFile));
- ASSERT_SOME(os::write(blobPath, blobResponse));
-
- Path blobTarPath(path::join(registryPullerPath, blobFile + ".tar"));
-
- vector<string> argv = {
- "tar",
- "-C",
- registryPullerPath,
- "-c",
- "-f",
- blobTarPath,
- blobFile
- };
-
- Try<Subprocess> s = subprocess(
- "tar",
- argv,
- Subprocess::PATH("/dev/null"),
- Subprocess::PATH("/dev/null"),
- Subprocess::PATH("/dev/null"));
-
- ASSERT_SOME(s);
- AWAIT_ASSERT_READY(s.get().status());
-
- Try<Bytes> tarSize = os::stat::size(blobTarPath);
- ASSERT_SOME(tarSize);
-
- ASSERT_SOME(os::rm(blobPath));
-
- std::unique_ptr<char[]> tarBuffer(new char[tarSize.get().bytes()]);
- ASSERT_NE(tarBuffer.get(), nullptr);
-
- Try<int> fd = os::open(
- blobTarPath,
- O_RDONLY,
- S_IRUSR | S_IRGRP | S_IROTH);
- ASSERT_SOME(fd);
-
- ASSERT_SOME(os::nonblock(fd.get()));
-
- AWAIT_ASSERT_READY(
- io::read(fd.get(), tarBuffer.get(), tarSize.get().bytes()));
-
- ASSERT_SOME(os::close(fd.get()));
-
- const string blobHttpResponse =
- string("HTTP/1.1 200 OK\r\n") +
- "Content-type : application/octet-stream\r\n" +
- "Content-Length : " +
- stringify(tarSize.get().bytes()) + "\r\n" +
- "\r\n";
-
- const size_t blobResponseSize =
- blobHttpResponse.length() + tarSize.get().bytes();
-
- std::unique_ptr<char[]> responseBuffer(new char[blobResponseSize]);
- ASSERT_NE(responseBuffer.get(), nullptr);
-
- memcpy(
- responseBuffer.get(),
- blobHttpResponse.c_str(),
- blobHttpResponse.length());
-
- memcpy(
- responseBuffer.get() + blobHttpResponse.length(),
- tarBuffer.get(),
- tarSize.get().bytes());
-
- AWAIT_ASSERT_READY(Socket(blobServerAcceptSock.get()).send(
- responseBuffer.get(),
- blobResponseSize));
-
- AWAIT_ASSERT_READY(registryPullerFuture);
- vector<string> layers = registryPullerFuture.get();
- ASSERT_EQ(1u, layers.size());
- ASSERT_EQ(layers.front(),
- "1ce2e90b0bc7224de3db1f0d646fe8e2c4dd37f1793928287f6074bc451a57ea");
-
- Try<string> blob = os::read(path::join(
- registryPullerPath,
- layers.front(),
- "rootfs",
- blobFile));
-
- ASSERT_SOME(blob);
- ASSERT_EQ(blob.get(), blobResponse);
-}
-
-#endif // USE_SSL_SOCKET
-
-
class ProvisionerDockerLocalStoreTest : public TemporaryDirectoryTest
{
public:
@@ -999,20 +65,22 @@ public:
const slave::Flags& flags,
const vector<string>& layers)
{
- const string layersPath = path::join(flags.docker_store_dir, "layers");
-
// Verify contents of the image in store directory.
- const string layerPath1 =
- getImageLayerRootfsPath(flags.docker_store_dir, "123");
+ const string layerPath1 = paths::getImageLayerRootfsPath(
+ flags.docker_store_dir,
+ "123");
- const string layerPath2 =
- getImageLayerRootfsPath(flags.docker_store_dir, "456");
+ const string layerPath2 = paths::getImageLayerRootfsPath(
+ flags.docker_store_dir,
+ "456");
EXPECT_TRUE(os::exists(layerPath1));
EXPECT_TRUE(os::exists(layerPath2));
+
EXPECT_SOME_EQ(
"foo 123",
os::read(path::join(layerPath1 , "temp")));
+
EXPECT_SOME_EQ(
"bar 456",
os::read(path::join(layerPath2, "temp")));
@@ -1238,6 +306,30 @@ TEST_F(ProvisionerDockerLocalStoreTest, PullingSameImageSimutanuously)
EXPECT_EQ(imageInfo1.get().layers, imageInfo2.get().layers);
}
+
+class ProvisionerDockerRegistryPullerTest : public TemporaryDirectoryTest {};
+
+
+TEST_F(ProvisionerDockerRegistryPullerTest, INTERNET_CURL_Pull)
+{
+ slave::Flags flags;
+ flags.docker_registry = "https://registry-1.docker.io";
+ flags.docker_store_dir = os::getcwd();
+
+ Try<Owned<slave::Store>> store = Store::create(flags);
+ ASSERT_SOME(store);
+
+ Image image;
+ image.set_type(Image::DOCKER);
+ image.mutable_docker()->set_name("library/alpine");
+
+ Future<ImageInfo> imageInfo = store.get()->get(image);
+ AWAIT_READY_FOR(imageInfo, Seconds(60));
+
+ EXPECT_LE(1u, imageInfo->layers.size());
+ EXPECT_SOME(imageInfo->dockerManifest);
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {