You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/04/24 02:53:24 UTC
git commit: Added timeouts to registrar.
Repository: mesos
Updated Branches:
refs/heads/master 7e59962eb -> abba9d11c
Added timeouts to registrar.
Review: https://reviews.apache.org/r/20374
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/abba9d11
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/abba9d11
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/abba9d11
Branch: refs/heads/master
Commit: abba9d11cf9e7ffda06013b2abb63620418d6dfd
Parents: 7e59962
Author: Benjamin Hindman <be...@berkeley.edu>
Authored: Wed Apr 23 17:22:09 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Apr 23 17:28:15 2014 -0700
----------------------------------------------------------------------
src/master/flags.hpp | 14 ++++++
src/master/registrar.cpp | 29 +++++++++++--
src/tests/registrar_tests.cpp | 89 +++++++++++++++++++++++++++++++++++---
3 files changed, 123 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/abba9d11/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index acf3963..c83292f 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -74,6 +74,18 @@ public:
"bootstrap the persistent state on a running cluster.",
false);
+ add(&Flags::registry_fetch_timeout,
+ "registry_fetch_timeout",
+ "Duration of time to wait in order to fetch data from the registry\n"
+ "after which the operation is considered a failure.",
+ Seconds(60));
+
+ add(&Flags::registry_store_timeout,
+ "registry_store_timeout",
+ "Duration of time to wait in order to store data in the registry\n"
+ "after which the operation is considered a failure.",
+ Seconds(5));
+
// TODO(bmahler): Add a 'Percentage' abstraction for flags.
// TODO(bmahler): Add a --production flag for production defaults.
add(&Flags::recovery_slave_removal_limit,
@@ -161,6 +173,8 @@ public:
std::string work_dir;
std::string registry;
bool registry_strict;
+ Duration registry_fetch_timeout;
+ Duration registry_store_timeout;
std::string recovery_slave_removal_limit;
std::string webui_dir;
std::string whitelist;
http://git-wip-us.apache.org/repos/asf/mesos/blob/abba9d11/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index ac65f20..8e7760b 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -196,6 +196,20 @@ private:
};
+// Helper for treating State operations that timeout as failures.
+template <typename T>
+Future<T> timeout(
+ const string& operation,
+ const Duration& duration,
+ Future<T> future)
+{
+ future.discard();
+
+ return Failure(
+ "Failed to perform " + operation + " within " + stringify(duration));
+}
+
+
Future<Response> RegistrarProcess::registry(const Request& request)
{
JSON::Object result;
@@ -270,9 +284,14 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
LOG(INFO) << "Recovering registrar";
if (recovered.isNone()) {
- // TODO(benh): Don't wait forever to recover?
timers.state_fetch.start();
state->fetch<Registry>("registry")
+ .after(flags.registry_fetch_timeout,
+ lambda::bind(
+ &timeout<Variable<Registry> >,
+ "fetch",
+ flags.registry_fetch_timeout,
+ lambda::_1))
.onAny(defer(self(), &Self::_recover, info, lambda::_1));
updating = true;
recovered = Owned<Promise<Registry> >(new Promise<Registry>());
@@ -384,11 +403,15 @@ void RegistrarProcess::update()
(*operation)(®istry, &slaveIDs, flags.registry_strict);
}
- // TODO(benh): Add a timeout so we don't wait forever.
-
// Perform the store!
timers.state_store.start();
state->store(variable.get().mutate(registry))
+ .after(flags.registry_store_timeout,
+ lambda::bind(
+ &timeout<Option<Variable<Registry> > >,
+ "store",
+ flags.registry_store_timeout,
+ lambda::_1))
.onAny(defer(self(), &Self::_update, lambda::_1, operations));
// Clear the operations, _update will transition the Promises!
http://git-wip-us.apache.org/repos/asf/mesos/blob/abba9d11/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 7afa223..917a470 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -29,6 +29,7 @@
#include <stout/bytes.hpp>
#include <stout/stopwatch.hpp>
+#include <stout/uuid.hpp>
#include "common/protobuf_utils.hpp"
#include "common/type_utils.hpp"
@@ -38,6 +39,8 @@
#include "log/tool/initialize.hpp"
+#include "messages/state.hpp"
+
#include "master/flags.hpp"
#include "master/master.hpp"
#include "master/registrar.hpp"
@@ -53,6 +56,13 @@ using namespace mesos::internal;
using namespace process;
+using log::Log;
+using log::Replica;
+
+using state::Entry;
+using state::LogStorage;
+using state::Storage;
+
using state::protobuf::State;
using std::map;
@@ -61,7 +71,9 @@ using std::string;
using std::vector;
using testing::_;
+using testing::DoAll;
using testing::Eq;
+using testing::Return;
using mesos::internal::tests::TemporaryDirectoryTest;
@@ -102,13 +114,13 @@ protected:
// Only create the replica for 'path2' (i.e., the second replica)
// as the first replica will be created when we create a Log.
- replica2 = new log::Replica(path2);
+ replica2 = new Replica(path2);
set<UPID> pids;
pids.insert(replica2->pid());
- log = new log::Log(2, path1, pids);
- storage = new state::LogStorage(log);
+ log = new Log(2, path1, pids);
+ storage = new LogStorage(log);
state = new State(storage);
master.CopyFrom(protobuf::createMasterInfo(UPID("master@127.0.0.1:5050")));
@@ -124,11 +136,11 @@ protected:
TemporaryDirectoryTest::TearDown();
}
- log::Log* log;
- state::Storage* storage;
+ Log* log;
+ Storage* storage;
State* state;
- log::Replica* replica2;
+ Replica* replica2;
MasterInfo master;
Flags flags;
@@ -342,6 +354,71 @@ TEST_P(RegistrarTest, bootstrap)
}
+class MockStorage : public Storage
+{
+public:
+ MOCK_METHOD1(get, Future<Option<Entry> >(const string&));
+ MOCK_METHOD2(set, Future<bool>(const Entry&, const UUID&));
+ MOCK_METHOD1(expunge, Future<bool>(const Entry&));
+ MOCK_METHOD0(names, Future<std::set<string> >(void));
+};
+
+
+TEST_P(RegistrarTest, fetchTimeout)
+{
+ Clock::pause();
+
+ MockStorage storage;
+ State state(&storage);
+
+ Future<Nothing> get;
+ EXPECT_CALL(storage, get(_))
+ .WillOnce(DoAll(FutureSatisfy(&get),
+ Return(Future<Option<Entry> >())));
+
+ Registrar registrar(flags, &state);
+
+ Future<Registry> recover = registrar.recover(master);
+
+ AWAIT_READY(get);
+
+ Clock::advance(flags.registry_fetch_timeout);
+
+ AWAIT_FAILED(recover);
+
+ Clock::resume();
+}
+
+
+TEST_P(RegistrarTest, storeTimeout)
+{
+ Clock::pause();
+
+ MockStorage storage;
+ State state(&storage);
+
+ Registrar registrar(flags, &state);
+
+ EXPECT_CALL(storage, get(_))
+ .WillOnce(Return(None()));
+
+ Future<Nothing> set;
+ EXPECT_CALL(storage, set(_, _))
+ .WillOnce(DoAll(FutureSatisfy(&set),
+ Return(Future<bool>())));
+
+ Future<Registry> recover = registrar.recover(master);
+
+ AWAIT_READY(set);
+
+ Clock::advance(flags.registry_store_timeout);
+
+ AWAIT_FAILED(recover);
+
+ Clock::resume();
+}
+
+
class Registrar_BENCHMARK_Test : public RegistrarTestBase,
public WithParamInterface<size_t>
{};