You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/01 15:46:02 UTC
[4/4] mesos git commit: Final comments from benh on Fetcher Cache.
Final comments from benh on Fetcher Cache.
See https://reviews.apache.org/r/30774.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7aede4ad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7aede4ad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7aede4ad
Branch: refs/heads/master
Commit: 7aede4ad40b500e6a9dc5ead4a718d415bf5d889
Parents: edd35b0
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat May 23 18:13:51 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Mon Jun 1 06:45:29 2015 -0700
----------------------------------------------------------------------
src/slave/containerizer/fetcher.cpp | 241 +++++++++++++++----------------
src/slave/containerizer/fetcher.hpp | 28 +---
src/tests/fetcher_cache_tests.cpp | 40 +++--
src/tests/mesos.cpp | 4 +-
src/tests/mesos.hpp | 6 +-
5 files changed, 146 insertions(+), 173 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index c519bff..d4f127a 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -376,27 +376,20 @@ Future<Nothing> FetcherProcess::fetch(
newEntry->reference();
- entries[uri] = async(&fetchSize, uri.value(), flags.frameworks_home)
- .then(defer(self(),
- &FetcherProcess::reserveCacheSpace,
- lambda::_1,
- newEntry));
- }
- }
-
- list<Future<shared_ptr<Cache::Entry>>> futures;
-
- // Get out all of the futures we need to wait for so we can wait on them
- // together via 'await'.
- foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
- entries) {
- if (entry.isSome()) {
- futures.push_back(entry.get());
+ entries[uri] =
+ async([=]() {
+ return fetchSize(uri.value(), flags.frameworks_home);
+ })
+ .then(defer(self(), [=](const Try<Bytes>& requestedSpace) {
+ return reserveCacheSpace(requestedSpace, newEntry);
+ }));
}
}
- return _fetch(futures,
- entries,
+ // NOTE: We explicitly call the continuation '_fetch' even though it
+ // looks like we could easily inline it here because we want to be
+ // able to mock the function for testing! Don't remove this!
+ return _fetch(entries,
containerId,
sandboxDirectory,
cacheDirectory,
@@ -406,7 +399,6 @@ Future<Nothing> FetcherProcess::fetch(
Future<Nothing> FetcherProcess::_fetch(
- const list<Future<shared_ptr<Cache::Entry>>> futures,
const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
entries,
const ContainerID& containerId,
@@ -415,57 +407,61 @@ Future<Nothing> FetcherProcess::_fetch(
const Option<string>& user,
const Flags& flags)
{
- return await(futures)
- .then(defer(self(),
- &FetcherProcess::__fetch,
- entries))
- .then(defer(self(),
- &FetcherProcess::___fetch,
- lambda::_1,
- containerId,
- sandboxDirectory,
- cacheDirectory,
- user,
- flags));
-}
-
-
-// For each URI, if there is a cache entry and waiting for it was
-// successful, extract it and add it to the resulting map. Otherwise
-// we'll assume we are not using or cannot use the cache for this URI.
-Future<hashmap<CommandInfo::URI,
- Option<shared_ptr<FetcherProcess::Cache::Entry>>>>
-FetcherProcess::__fetch(
- const hashmap<CommandInfo::URI,
- Option<Future<shared_ptr<Cache::Entry>>>>& entries)
-{
- hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+ // Get out all of the futures we need to wait for so we can wait on
+ // them together via 'await'.
+ list<Future<shared_ptr<Cache::Entry>>> futures;
- foreachpair (const CommandInfo::URI& uri,
- const Option<Future<shared_ptr<Cache::Entry>>>& entry,
- entries) {
+ foreachvalue (const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+ entries) {
if (entry.isSome()) {
- if (entry.get().isReady()) {
- result[uri] = entry.get().get();
- } else {
- LOG(WARNING) << "Reverting to fetching directly into the sandbox for '"
- << uri.value()
- << "', due to failure to fetch through the cache, "
- << "with error: " << entry.get().failure();
-
- result[uri] = None();
- }
- } else {
- // No entry means bypassing the cache.
- result[uri] = None();
+ futures.push_back(entry.get());
}
}
- return result;
+ return await(futures)
+ .then(defer(self(), [=]() {
+ // For each URI, if there is a potential cache entry and waiting
+ // for its associated future was successful, extract the entry
+ // from the future and store it in 'result'. Otherwise we assume
+ // we are not using or cannot use the cache for this URI.
+ hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>> result;
+
+ foreachpair (const CommandInfo::URI& uri,
+ const Option<Future<shared_ptr<Cache::Entry>>>& entry,
+ entries) {
+ if (entry.isSome()) {
+ if (entry.get().isReady()) {
+ result[uri] = entry.get().get();
+ } else {
+ LOG(WARNING)
+ << "Reverting to fetching directly into the sandbox for '"
+ << uri.value()
+ << "', due to failure to fetch through the cache, "
+ << "with error: " << entry.get().failure();
+
+ result[uri] = None();
+ }
+ } else {
+ // No entry means bypassing the cache.
+ result[uri] = None();
+ }
+ }
+
+ // NOTE: While we could inline '__fetch' we've explicitly kept
+ // it as a separate function to minimize complexity. Like with
+ // '_fetch', this also enables this phase of the fetcher cache
+ // to easily be mocked for testing!
+ return __fetch(result,
+ containerId,
+ sandboxDirectory,
+ cacheDirectory,
+ user,
+ flags);
+ }));
}
-Future<Nothing> FetcherProcess::___fetch(
+Future<Nothing> FetcherProcess::__fetch(
const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries,
const ContainerID& containerId,
const string& sandboxDirectory,
@@ -515,61 +511,50 @@ Future<Nothing> FetcherProcess::___fetch(
}
return run(containerId, info, flags)
- .repair(defer(self(), &FetcherProcess::__runFail, lambda::_1, entries))
- .then(defer(self(), &FetcherProcess::__runSucceed, entries));
-}
-
-
-Future<Nothing> FetcherProcess::__runFail(
- const Future<Nothing>& future,
- const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
-{
- LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
-
- foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
- if (entry.isSome()) {
- entry.get()->unreference();
-
- if (entry.get()->completion().isPending()) {
- // Unsuccessfully (or partially) downloaded! Remove from the cache.
- entry.get()->fail();
- cache.remove(entry.get()); // Might delete partial download.
+ .repair(defer(self(), [=](const Future<Nothing>& future) {
+ LOG(ERROR) << "Failed to run mesos-fetcher: " << future.failure();
+
+ foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+ if (entry.isSome()) {
+ entry.get()->unreference();
+
+ if (entry.get()->completion().isPending()) {
+ // Unsuccessfully (or partially) downloaded! Remove from the cache.
+ entry.get()->fail();
+ cache.remove(entry.get()); // Might delete partial download.
+ }
+ }
}
- }
- }
-
- return future; // Always propagate the failure!
-}
-
-Future<Nothing> FetcherProcess::__runSucceed(
- const hashmap<CommandInfo::URI, Option<shared_ptr<Cache::Entry>>>& entries)
-{
- foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
- if (entry.isSome()) {
- entry.get()->unreference();
-
- if (entry.get()->completion().isPending()) {
- // Successfully downloaded and cached!
-
- Try<Nothing> adjust = cache.adjust(entry.get());
- if (adjust.isSome()) {
- entry.get()->complete();
- } else {
- LOG(WARNING) << "Failed to adjust the cache size for entry '"
- << entry.get()->key << "' with error: "
- << adjust.error();
-
- // Successfully fetched, but not reusable from the cache,
- // because we are deleting the entry now.
- entry.get()->fail();
- cache.remove(entry.get());
+ return future; // Always propagate the failure!
+ }))
+ .then(defer(self(), [=]() {
+ foreachvalue (const Option<shared_ptr<Cache::Entry>>& entry, entries) {
+ if (entry.isSome()) {
+ entry.get()->unreference();
+
+ if (entry.get()->completion().isPending()) {
+ // Successfully downloaded and cached!
+
+ Try<Nothing> adjust = cache.adjust(entry.get());
+ if (adjust.isSome()) {
+ entry.get()->complete();
+ } else {
+ LOG(WARNING) << "Failed to adjust the cache size for entry '"
+ << entry.get()->key << "' with error: "
+ << adjust.error();
+
+ // Successfully fetched, but not reusable from the
+ // cache, because we are deleting the entry now.
+ entry.get()->fail();
+ cache.remove(entry.get());
+ }
+ }
}
}
- }
- }
- return Nothing();
+ return Nothing();
+ }));
}
@@ -647,12 +632,10 @@ Bytes FetcherProcess::availableCacheSpace()
Future<shared_ptr<FetcherProcess::Cache::Entry>>
FetcherProcess::reserveCacheSpace(
- const Future<Try<Bytes>>& requestedSpace,
+ const Try<Bytes>& requestedSpace,
const shared_ptr<FetcherProcess::Cache::Entry>& entry)
{
- CHECK_READY(requestedSpace);
-
- if (requestedSpace.get().isError()) {
+ if (requestedSpace.isError()) {
// Let anyone waiting on this future know that we've
// failed to download and they should bypass the cache
// (any new requests will try again).
@@ -661,10 +644,10 @@ FetcherProcess::reserveCacheSpace(
return Failure("Could not determine size of cache file for '" +
entry->key + "' with error: " +
- requestedSpace.get().error());
+ requestedSpace.error());
}
- Try<Nothing> reservation = cache.reserve(requestedSpace.get().get());
+ Try<Nothing> reservation = cache.reserve(requestedSpace.get());
if (reservation.isError()) {
// Let anyone waiting on this future know that we've
@@ -679,12 +662,12 @@ FetcherProcess::reserveCacheSpace(
VLOG(1) << "Claiming fetcher cache space for: " << entry->key;
- cache.claimSpace(requestedSpace.get().get());
+ cache.claimSpace(requestedSpace.get());
- // NOTE: We must set the entry size only when are also claiming the
- // space! Other functions rely on this dependency (see
+ // NOTE: We must set the entry size only when we are also claiming
+ // the space! Other functions rely on this dependency (see
// Cache::remove()).
- entry->size = requestedSpace.get().get();
+ entry->size = requestedSpace.get();
return entry;
}
@@ -735,6 +718,9 @@ Future<Nothing> FetcherProcess::run(
<< (realpath.isError() ? realpath.error()
: "No such file or directory");
+ os::close(out.get());
+ os::close(err.get());
+
return Failure("Could not fetch URIs: failed to find mesos-fetcher");
}
@@ -761,12 +747,15 @@ Future<Nothing> FetcherProcess::run(
environment);
if (fetcherSubprocess.isError()) {
+ os::close(out.get());
+ os::close(err.get());
return Failure("Failed to execute mesos-fetcher: " +
fetcherSubprocess.error());
}
- // Remember this PID in case we need to kill the subprocess. See kill().
- // This value gets reset in __run().
+ // Remember this PID in case we need to kill the subprocess. See
+ // FetcherProcess::kill(). This value gets removed after we wait on
+ // the subprocess.
subprocessPids[containerId] = fetcherSubprocess.get().pid();
return fetcherSubprocess.get().status()
@@ -784,7 +773,7 @@ Future<Nothing> FetcherProcess::run(
return Nothing();
}))
- .onAny(defer(self(), [=](const Future<Nothing>& result) {
+ .onAny(defer(self(), [=](const Future<Nothing>&) {
// Clear the subprocess PID remembered from running mesos-fetcher.
subprocessPids.erase(containerId);
@@ -1058,7 +1047,7 @@ void FetcherProcess::Cache::setSpace(const Bytes& bytes)
{
if (space > 0) {
// Dynamic cache size changes not supported.
- CHECK(space == bytes);
+ CHECK_EQ(space, bytes);
} else {
space = bytes;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 3b63711..16553ff 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -284,8 +284,6 @@ public:
// Public and virtual for mock testing.
virtual process::Future<Nothing> _fetch(
- const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
- futures,
const hashmap<
CommandInfo::URI,
Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
@@ -311,14 +309,7 @@ public:
Bytes availableCacheSpace();
private:
- process::Future<hashmap<
- CommandInfo::URI,
- Option<std::shared_ptr<Cache::Entry>>>>
- __fetch(const hashmap<
- CommandInfo::URI,
- Option<process::Future<std::shared_ptr<Cache::Entry>>>>& entries);
-
- process::Future<Nothing> ___fetch(
+ process::Future<Nothing> __fetch(
const hashmap<CommandInfo::URI,
Option<std::shared_ptr<Cache::Entry>>>& entries,
const ContainerID& containerId,
@@ -327,26 +318,11 @@ private:
const Option<std::string>& user,
const Flags& flags);
- process::Future<Nothing> _run(
- const Option<int>& status,
- const ContainerID& containerId);
-
- void __run(const ContainerID& containerId, const int out, const int err);
-
- process::Future<Nothing> __runFail(
- const process::Future<Nothing>& future,
- const hashmap<CommandInfo::URI,
- Option<std::shared_ptr<Cache::Entry>>>& entries);
-
- process::Future<Nothing> __runSucceed(
- const hashmap<CommandInfo::URI,
- Option<std::shared_ptr<Cache::Entry>>>& entries);
-
// Calls Cache::reserve() and returns a ready entry future if successful,
// else Failure. Claims the space and assigns the entry's size to this
// amount if and only if successful.
process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
- const process::Future<Try<Bytes>>& requestedSpace,
+ const Try<Bytes>& requestedSpace,
const std::shared_ptr<Cache::Entry>& entry);
Cache cache;
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/fetcher_cache_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp
index 99777f8..bd53fc1 100644
--- a/src/tests/fetcher_cache_tests.cpp
+++ b/src/tests/fetcher_cache_tests.cpp
@@ -113,7 +113,10 @@ static const string ARCHIVED_COMMAND_SCRIPT =
class FetcherCacheTest : public MesosTest
{
public:
- struct Task {
+ // A helper struct that captures useful information for each of the
+ // tasks that we have launched to help test expectations.
+ struct Task
+ {
Path runDirectory;
Queue<TaskStatus> statusQueue;
};
@@ -386,7 +389,7 @@ FetcherCacheTest::Task FetcherCacheTest::launchTask(
offer.framework_id(),
executorId));
- return Task {path, taskStatusQueue};
+ return Task{path, taskStatusQueue};
}
@@ -439,7 +442,7 @@ vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks(
// When _fetch() is called, notify us by satisfying a promise that
// a task has passed the code stretch in which it competes for cache
// entries.
- EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _, _))
+ EXPECT_CALL(*fetcherProcess, _fetch(_, _, _, _, _, _))
.WillRepeatedly(
DoAll(SatisfyOne(&fetchContentionWaypoints),
Invoke(fetcherProcess, &MockFetcherProcess::unmocked__fetch)));
@@ -708,10 +711,17 @@ TEST_F(FetcherCacheTest, LocalCachedExtract)
class FetcherCacheHttpTest : public FetcherCacheTest
{
public:
- // A minimal HTTP server (not intended as an actor) just reusing what
- // is already implemented somewhere to serve some HTTP requests for
- // file downloads. Plus counting how many requests are made. Plus the
- // ability to pause answering requests, stalling them.
+ // A minimal HTTP server (NOTE: not written as an actor, but this is
+ // deprecated, see below) just reusing what is already implemented
+ // somewhere to serve some HTTP requests for file downloads. Plus
+ // counting how many requests are made. Plus the ability to pause
+ // answering requests, stalling them.
+ //
+ // TODO(bernd-mesos): This class follows a dangerous style of mixing
+ // actors and non-actors, DO NOT REPLICATE. Ultimately we want to
+ // replace this with a generic HTTP server that can be used by other
+ // tests as well and enables things like pausing requests,
+ // manipulating requests, mocking, etc.
class HttpServer : public Process<HttpServer>
{
public:
@@ -722,15 +732,11 @@ public:
{
provide(COMMAND_NAME, test->commandPath);
provide(ARCHIVE_NAME, test->archivePath);
-
- spawn(this);
}
string url()
{
- return "http://127.0.0.1:" +
- stringify(self().address.port) +
- "/" + self().id + "/";
+ return "http://" + stringify(self().address) + "/" + self().id + "/";
}
// Stalls the execution of HTTP requests inside visit().
@@ -746,6 +752,14 @@ public:
virtual void visit(const HttpEvent& event)
{
+ // TODO(bernd-mesos): Don't use locks here because we'll
+ // actually block libprocess threads which could cause a
+ // deadlock if we have a test with too many requests that we
+ // don't have enough threads to run other actors! Instead,
+ // consider asynchronously deferring the actual execution of
+ // this function via a Queue. This is currently non-trivial
+ // because we can't copy an HttpEvent so we're _forced_ to block
+ // the thread synchronously.
std::lock_guard<std::mutex> lock(mutex);
countRequests++;
@@ -776,12 +790,12 @@ public:
std::mutex mutex;
};
-
virtual void SetUp()
{
FetcherCacheTest::SetUp();
httpServer = new HttpServer(this);
+ spawn(httpServer);
}
virtual void TearDown()
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index d7a3c06..830d362 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -450,7 +450,7 @@ void MockSlave::unmocked___recover(const Future<Nothing>& future)
MockFetcherProcess::MockFetcherProcess()
{
// Set up default behaviors, calling the original methods.
- EXPECT_CALL(*this, _fetch(_, _, _, _, _, _, _)).
+ EXPECT_CALL(*this, _fetch(_, _, _, _, _, _)).
WillRepeatedly(
Invoke(this, &MockFetcherProcess::unmocked__fetch));
EXPECT_CALL(*this, run(_, _, _)).
@@ -459,7 +459,6 @@ MockFetcherProcess::MockFetcherProcess()
process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
- const list<Future<shared_ptr<Cache::Entry>>> futures,
const hashmap<CommandInfo::URI, Option<Future<shared_ptr<Cache::Entry>>>>&
entries,
const ContainerID& containerId,
@@ -469,7 +468,6 @@ process::Future<Nothing> MockFetcherProcess::unmocked__fetch(
const slave::Flags& flags)
{
return slave::FetcherProcess::_fetch(
- futures,
entries,
containerId,
sandboxDirectory,
http://git-wip-us.apache.org/repos/asf/mesos/blob/7aede4ad/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index a1c6ae4..4fcf0b7 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -810,9 +810,7 @@ public:
virtual ~MockFetcherProcess() {}
- MOCK_METHOD7(_fetch, process::Future<Nothing>(
- const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
- futures,
+ MOCK_METHOD6(_fetch, process::Future<Nothing>(
const hashmap<
CommandInfo::URI,
Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
@@ -824,8 +822,6 @@ public:
const slave::Flags& flags));
process::Future<Nothing> unmocked__fetch(
- const std::list<process::Future<std::shared_ptr<Cache::Entry>>>
- futures,
const hashmap<
CommandInfo::URI,
Option<process::Future<std::shared_ptr<Cache::Entry>>>>&