You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/04/24 02:53:24 UTC

git commit: Added timeouts to registrar.

Repository: mesos
Updated Branches:
  refs/heads/master 7e59962eb -> abba9d11c


Added timeouts to registrar.

Review: https://reviews.apache.org/r/20374


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/abba9d11
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/abba9d11
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/abba9d11

Branch: refs/heads/master
Commit: abba9d11cf9e7ffda06013b2abb63620418d6dfd
Parents: 7e59962
Author: Benjamin Hindman <be...@berkeley.edu>
Authored: Wed Apr 23 17:22:09 2014 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Apr 23 17:28:15 2014 -0700

----------------------------------------------------------------------
 src/master/flags.hpp          | 14 ++++++
 src/master/registrar.cpp      | 29 +++++++++++--
 src/tests/registrar_tests.cpp | 89 +++++++++++++++++++++++++++++++++++---
 3 files changed, 123 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/abba9d11/src/master/flags.hpp
----------------------------------------------------------------------
diff --git a/src/master/flags.hpp b/src/master/flags.hpp
index acf3963..c83292f 100644
--- a/src/master/flags.hpp
+++ b/src/master/flags.hpp
@@ -74,6 +74,18 @@ public:
         "bootstrap the persistent state on a running cluster.",
         false);
 
+    add(&Flags::registry_fetch_timeout,
+        "registry_fetch_timeout",
+        "Duration of time to wait in order to fetch data from the registry\n"
+        "after which the operation is considered a failure.",
+        Seconds(60));
+
+    add(&Flags::registry_store_timeout,
+        "registry_store_timeout",
+        "Duration of time to wait in order to store data in the registry\n"
+        "after which the operation is considered a failure.",
+        Seconds(5));
+
     // TODO(bmahler): Add a 'Percentage' abstraction for flags.
     // TODO(bmahler): Add a --production flag for production defaults.
     add(&Flags::recovery_slave_removal_limit,
@@ -161,6 +173,8 @@ public:
   std::string work_dir;
   std::string registry;
   bool registry_strict;
+  Duration registry_fetch_timeout;
+  Duration registry_store_timeout;
   std::string recovery_slave_removal_limit;
   std::string webui_dir;
   std::string whitelist;

http://git-wip-us.apache.org/repos/asf/mesos/blob/abba9d11/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index ac65f20..8e7760b 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -196,6 +196,20 @@ private:
 };
 
 
+// Helper for treating State operations that timeout as failures.
+template <typename T>
+Future<T> timeout(
+    const string& operation,
+    const Duration& duration,
+    Future<T> future)
+{
+  future.discard();
+
+  return Failure(
+      "Failed to perform " + operation + " within " + stringify(duration));
+}
+
+
 Future<Response> RegistrarProcess::registry(const Request& request)
 {
   JSON::Object result;
@@ -270,9 +284,14 @@ Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
   LOG(INFO) << "Recovering registrar";
 
   if (recovered.isNone()) {
-    // TODO(benh): Don't wait forever to recover?
     timers.state_fetch.start();
     state->fetch<Registry>("registry")
+      .after(flags.registry_fetch_timeout,
+             lambda::bind(
+                 &timeout<Variable<Registry> >,
+                 "fetch",
+                 flags.registry_fetch_timeout,
+                 lambda::_1))
       .onAny(defer(self(), &Self::_recover, info, lambda::_1));
     updating = true;
     recovered = Owned<Promise<Registry> >(new Promise<Registry>());
@@ -384,11 +403,15 @@ void RegistrarProcess::update()
     (*operation)(&registry, &slaveIDs, flags.registry_strict);
   }
 
-  // TODO(benh): Add a timeout so we don't wait forever.
-
   // Perform the store!
   timers.state_store.start();
   state->store(variable.get().mutate(registry))
+    .after(flags.registry_store_timeout,
+           lambda::bind(
+               &timeout<Option<Variable<Registry> > >,
+               "store",
+               flags.registry_store_timeout,
+               lambda::_1))
     .onAny(defer(self(), &Self::_update, lambda::_1, operations));
 
   // Clear the operations, _update will transition the Promises!

http://git-wip-us.apache.org/repos/asf/mesos/blob/abba9d11/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 7afa223..917a470 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -29,6 +29,7 @@
 
 #include <stout/bytes.hpp>
 #include <stout/stopwatch.hpp>
+#include <stout/uuid.hpp>
 
 #include "common/protobuf_utils.hpp"
 #include "common/type_utils.hpp"
@@ -38,6 +39,8 @@
 
 #include "log/tool/initialize.hpp"
 
+#include "messages/state.hpp"
+
 #include "master/flags.hpp"
 #include "master/master.hpp"
 #include "master/registrar.hpp"
@@ -53,6 +56,13 @@ using namespace mesos::internal;
 
 using namespace process;
 
+using log::Log;
+using log::Replica;
+
+using state::Entry;
+using state::LogStorage;
+using state::Storage;
+
 using state::protobuf::State;
 
 using std::map;
@@ -61,7 +71,9 @@ using std::string;
 using std::vector;
 
 using testing::_;
+using testing::DoAll;
 using testing::Eq;
+using testing::Return;
 
 using mesos::internal::tests::TemporaryDirectoryTest;
 
@@ -102,13 +114,13 @@ protected:
 
     // Only create the replica for 'path2' (i.e., the second replica)
     // as the first replica will be created when we create a Log.
-    replica2 = new log::Replica(path2);
+    replica2 = new Replica(path2);
 
     set<UPID> pids;
     pids.insert(replica2->pid());
 
-    log = new log::Log(2, path1, pids);
-    storage = new state::LogStorage(log);
+    log = new Log(2, path1, pids);
+    storage = new LogStorage(log);
     state = new State(storage);
 
     master.CopyFrom(protobuf::createMasterInfo(UPID("master@127.0.0.1:5050")));
@@ -124,11 +136,11 @@ protected:
     TemporaryDirectoryTest::TearDown();
   }
 
-  log::Log* log;
-  state::Storage* storage;
+  Log* log;
+  Storage* storage;
   State* state;
 
-  log::Replica* replica2;
+  Replica* replica2;
 
   MasterInfo master;
   Flags flags;
@@ -342,6 +354,71 @@ TEST_P(RegistrarTest, bootstrap)
 }
 
 
+class MockStorage : public Storage
+{
+public:
+  MOCK_METHOD1(get, Future<Option<Entry> >(const string&));
+  MOCK_METHOD2(set, Future<bool>(const Entry&, const UUID&));
+  MOCK_METHOD1(expunge, Future<bool>(const Entry&));
+  MOCK_METHOD0(names, Future<std::set<string> >(void));
+};
+
+
+TEST_P(RegistrarTest, fetchTimeout)
+{
+  Clock::pause();
+
+  MockStorage storage;
+  State state(&storage);
+
+  Future<Nothing> get;
+  EXPECT_CALL(storage, get(_))
+    .WillOnce(DoAll(FutureSatisfy(&get),
+                    Return(Future<Option<Entry> >())));
+
+  Registrar registrar(flags, &state);
+
+  Future<Registry> recover = registrar.recover(master);
+
+  AWAIT_READY(get);
+
+  Clock::advance(flags.registry_fetch_timeout);
+
+  AWAIT_FAILED(recover);
+
+  Clock::resume();
+}
+
+
+TEST_P(RegistrarTest, storeTimeout)
+{
+  Clock::pause();
+
+  MockStorage storage;
+  State state(&storage);
+
+  Registrar registrar(flags, &state);
+
+  EXPECT_CALL(storage, get(_))
+    .WillOnce(Return(None()));
+
+  Future<Nothing> set;
+  EXPECT_CALL(storage, set(_, _))
+    .WillOnce(DoAll(FutureSatisfy(&set),
+                    Return(Future<bool>())));
+
+  Future<Registry> recover = registrar.recover(master);
+
+  AWAIT_READY(set);
+
+  Clock::advance(flags.registry_store_timeout);
+
+  AWAIT_FAILED(recover);
+
+  Clock::resume();
+}
+
+
 class Registrar_BENCHMARK_Test : public RegistrarTestBase,
                                  public WithParamInterface<size_t>
 {};