You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/26 18:57:46 UTC

[21/28] Refactored base 'State' implementation to be serialization agnostic and use a 'Storage' instance. Changed the LevelDB and ZooKeeper implementations to implement 'Storage' instead of 'State'. Provided a protobuf specific implementation on top of '

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index 2e75673..ca12499 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -31,14 +31,15 @@
 #include <stout/gtest.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
+#include <stout/try.hpp>
 
 #include "common/type_utils.hpp"
 
-#include "messages/messages.hpp"
+#include "master/registry.hpp"
 
 #include "state/leveldb.hpp"
-#include "state/serializer.hpp"
-#include "state/state.hpp"
+#include "state/protobuf.hpp"
+#include "state/storage.hpp"
 #include "state/zookeeper.hpp"
 
 #ifdef MESOS_HAS_JAVA
@@ -47,166 +48,272 @@
 
 using namespace mesos;
 using namespace mesos::internal;
-using namespace mesos::internal::state;
+using namespace mesos::internal::tests;
 
 using namespace process;
 
+using mesos::internal::registry::Registry;
+using mesos::internal::registry::Slave;
 
-void GetSetGet(State<ProtobufSerializer>* state)
-{
-  Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
-  AWAIT_READY(variable);
+using state::LevelDBStorage;
+using state::Storage;
+#ifdef MESOS_HAS_JAVA
+using state::ZooKeeperStorage;
+#endif
 
-  Variable<Slaves> slaves1 = variable.get();
-  EXPECT_TRUE(slaves1->infos().size() == 0);
+using state::protobuf::State;
+using state::protobuf::Variable;
 
-  SlaveInfo info;
-  info.set_hostname("localhost");
-  info.set_webui_hostname("localhost");
 
-  slaves1->add_infos()->MergeFrom(info);
+void FetchAndStoreAndFetch(State* state)
+{
+  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
 
-  Future<Option<Variable<Slaves> > > result = state->set(slaves1);
+  Variable<Registry> variable = future1.get();
 
-  result.await();
+  Registry registry1 = variable.get();
+  EXPECT_TRUE(registry1.slaves().size() == 0);
 
-  ASSERT_TRUE(result.isReady());
-  ASSERT_SOME(result.get());
+  Slave* slave = registry1.add_slaves();
+  slave->mutable_info()->set_hostname("localhost");
+  slave->mutable_info()->set_webui_hostname("localhost");
 
-  variable = state->get<Slaves>("slaves");
+  variable = variable.mutate(registry1);
 
-  variable.await();
+  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
 
-  ASSERT_TRUE(variable.isReady());
+  future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
 
-  Variable<Slaves> slaves2 = variable.get();
+  variable = future1.get();
 
-  ASSERT_TRUE(slaves2->infos().size() == 1);
-  EXPECT_EQ("localhost", slaves2->infos(0).hostname());
-  EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+  Registry registry2 = variable.get();
+  ASSERT_TRUE(registry2.slaves().size() == 1);
+  EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+  EXPECT_EQ("localhost", registry2.slaves(0).info().webui_hostname());
 }
 
 
-void GetSetSetGet(State<ProtobufSerializer>* state)
+void FetchAndStoreAndStoreAndFetch(State* state)
 {
-  Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
-  AWAIT_READY(variable);
+  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
+
+  Variable<Registry> variable = future1.get();
+
+  Registry registry1 = variable.get();
+  EXPECT_TRUE(registry1.slaves().size() == 0);
+
+  Slave* slave = registry1.add_slaves();
+  slave->mutable_info()->set_hostname("localhost");
+  slave->mutable_info()->set_webui_hostname("localhost");
+
+  variable = variable.mutate(registry1);
 
-  Variable<Slaves> slaves1 = variable.get();
-  EXPECT_TRUE(slaves1->infos().size() == 0);
+  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
 
-  SlaveInfo info;
-  info.set_hostname("localhost");
-  info.set_webui_hostname("localhost");
+  variable = future2.get().get();
 
-  slaves1->add_infos()->MergeFrom(info);
+  future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
 
-  Future<Option<Variable<Slaves> > > result = state->set(slaves1);
+  future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
 
-  result.await();
+  variable = future1.get();
 
-  ASSERT_TRUE(result.isReady());
-  ASSERT_SOME(result.get());
+  Registry registry2 = variable.get();
+  ASSERT_TRUE(registry2.slaves().size() == 1);
+  EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+  EXPECT_EQ("localhost", registry2.slaves(0).info().webui_hostname());
+}
+
+
+void FetchAndStoreAndStoreFailAndFetch(State* state)
+{
+  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
 
-  slaves1 = result.get().get();
+  Variable<Registry> variable1 = future1.get();
 
-  result = state->set(slaves1);
+  Registry registry1 = variable1.get();
+  EXPECT_TRUE(registry1.slaves().size() == 0);
 
-  result.await();
+  Slave* slave1 = registry1.add_slaves();
+  slave1->mutable_info()->set_hostname("localhost1");
+  slave1->mutable_info()->set_webui_hostname("localhost1");
 
-  ASSERT_TRUE(result.isReady());
-  ASSERT_SOME(result.get());
+  Variable<Registry> variable2 = variable1.mutate(registry1);
 
-  variable = state->get<Slaves>("slaves");
+  Future<Option<Variable<Registry> > > future2 = state->store(variable2);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
 
-  variable.await();
+  Registry registry2 = variable1.get();
+  EXPECT_TRUE(registry2.slaves().size() == 0);
 
-  ASSERT_TRUE(variable.isReady());
+  Slave* slave2 = registry2.add_slaves();
+  slave2->mutable_info()->set_hostname("localhost2");
+  slave2->mutable_info()->set_webui_hostname("localhost2");
 
-  Variable<Slaves> slaves2 = variable.get();
+  variable2 = variable1.mutate(registry2);
 
-  ASSERT_TRUE(slaves2->infos().size() == 1);
-  EXPECT_EQ("localhost", slaves2->infos(0).hostname());
-  EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+  future2 = state->store(variable2);
+  AWAIT_READY(future2);
+  EXPECT_TRUE(future2.get().isNone());
+
+  future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
+
+  variable1 = future1.get();
+
+  registry1 = variable1.get();
+  ASSERT_TRUE(registry1.slaves().size() == 1);
+  EXPECT_EQ("localhost1", registry1.slaves(0).info().hostname());
+  EXPECT_EQ("localhost1", registry1.slaves(0).info().webui_hostname());
 }
 
 
-void GetGetSetSetGet(State<ProtobufSerializer>* state)
+void FetchAndStoreAndExpungeAndFetch(State* state)
 {
-  Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
-  AWAIT_READY(variable);
+  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
 
-  Variable<Slaves> slaves1 = variable.get();
-  EXPECT_TRUE(slaves1->infos().size() == 0);
+  Variable<Registry> variable = future1.get();
 
-  variable = state->get<Slaves>("slaves");
-  AWAIT_READY(variable);
+  Registry registry1 = variable.get();
+  EXPECT_TRUE(registry1.slaves().size() == 0);
 
-  Variable<Slaves> slaves2 = variable.get();
-  EXPECT_TRUE(slaves2->infos().size() == 0);
+  Slave* slave = registry1.add_slaves();
+  slave->mutable_info()->set_hostname("localhost");
+  slave->mutable_info()->set_webui_hostname("localhost");
 
-  SlaveInfo info2;
-  info2.set_hostname("localhost2");
-  info2.set_webui_hostname("localhost2");
+  variable = variable.mutate(registry1);
 
-  slaves2->add_infos()->MergeFrom(info2);
+  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
 
-  Future<Option<Variable<Slaves> > > result = state->set(slaves2);
+  variable = future2.get().get();
 
-  result.await();
+  Future<bool> future3 = state->expunge(variable);
+  AWAIT_READY(future3);
+  ASSERT_TRUE(future3.get());
 
-  ASSERT_TRUE(result.isReady());
-  ASSERT_SOME(result.get());
+  future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
 
-  SlaveInfo info1;
-  info1.set_hostname("localhost1");
-  info1.set_webui_hostname("localhost1");
+  variable = future1.get();
 
-  slaves1->add_infos()->MergeFrom(info1);
+  Registry registry2 = variable.get();
+  ASSERT_EQ(0, registry2.slaves().size());
+}
 
-  result = state->set(slaves1);
-  AWAIT_READY(result);
-  EXPECT_TRUE(result.get().isNone());
 
-  variable = state->get<Slaves>("slaves");
-  AWAIT_READY(variable);
+void FetchAndStoreAndExpungeAndExpunge(State* state)
+{
+  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
+
+  Variable<Registry> variable = future1.get();
+
+  Registry registry1 = variable.get();
+  EXPECT_TRUE(registry1.slaves().size() == 0);
+
+  Slave* slave = registry1.add_slaves();
+  slave->mutable_info()->set_hostname("localhost");
+  slave->mutable_info()->set_webui_hostname("localhost");
 
-  slaves1 = variable.get();
+  variable = variable.mutate(registry1);
 
-  ASSERT_TRUE(slaves1->infos().size() == 1);
-  EXPECT_EQ("localhost2", slaves1->infos(0).hostname());
-  EXPECT_EQ("localhost2", slaves1->infos(0).webui_hostname());
+  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
+
+  variable = future2.get().get();
+
+  Future<bool> future3 = state->expunge(variable);
+  AWAIT_READY(future3);
+  ASSERT_TRUE(future3.get());
+
+  future3 = state->expunge(variable);
+  AWAIT_READY(future3);
+  ASSERT_FALSE(future3.get());
 }
 
 
-void Names(State<ProtobufSerializer>* state)
+void FetchAndStoreAndExpungeAndStoreAndFetch(State* state)
 {
-  Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
-  AWAIT_READY(variable);
+  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
 
-  Variable<Slaves> slaves1 = variable.get();
-  EXPECT_TRUE(slaves1->infos().size() == 0);
+  Variable<Registry> variable = future1.get();
 
-  SlaveInfo info;
-  info.set_hostname("localhost");
-  info.set_webui_hostname("localhost");
+  Registry registry1 = variable.get();
+  EXPECT_TRUE(registry1.slaves().size() == 0);
 
-  slaves1->add_infos()->MergeFrom(info);
+  Slave* slave = registry1.add_slaves();
+  slave->mutable_info()->set_hostname("localhost");
+  slave->mutable_info()->set_webui_hostname("localhost");
 
-  Future<Option<Variable<Slaves> > > result = state->set(slaves1);
+  variable = variable.mutate(registry1);
 
-  result.await();
+  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
 
-  ASSERT_TRUE(result.isReady());
-  EXPECT_SOME(result.get());
+  variable = future2.get().get();
 
-  Future<std::vector<std::string> > names = state->names();
+  Future<bool> future3 = state->expunge(variable);
+  AWAIT_READY(future3);
+  ASSERT_TRUE(future3.get());
+
+  future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
+
+  future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
+
+  variable = future1.get();
+
+  Registry registry2 = variable.get();
+  ASSERT_TRUE(registry2.slaves().size() == 1);
+  EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+  EXPECT_EQ("localhost", registry2.slaves(0).info().webui_hostname());
+}
+
+
+void Names(State* state)
+{
+  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  AWAIT_READY(future1);
+
+  Variable<Registry> variable = future1.get();
+
+  Registry registry1 = variable.get();
+  EXPECT_TRUE(registry1.slaves().size() == 0);
+
+  Slave* slave = registry1.add_slaves();
+  slave->mutable_info()->set_hostname("localhost");
+  slave->mutable_info()->set_webui_hostname("localhost");
 
-  names.await();
+  variable = variable.mutate(registry1);
 
-  ASSERT_TRUE(names.isReady());
+  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  AWAIT_READY(future2);
+  ASSERT_SOME(future2.get());
+
+  Future<std::vector<std::string> > names = state->names();
+  AWAIT_READY(names);
   ASSERT_TRUE(names.get().size() == 1);
-  EXPECT_EQ("slaves", names.get()[0]);
+  EXPECT_EQ("registry", names.get()[0]);
 }
 
 
@@ -214,43 +321,66 @@ class LevelDBStateTest : public ::testing::Test
 {
 public:
   LevelDBStateTest()
-    : state(NULL), path(os::getcwd() + "/.state") {}
+    : storage(NULL),
+      state(NULL),
+      path(os::getcwd() + "/.state") {}
 
 protected:
   virtual void SetUp()
   {
     os::rmdir(path);
-    state = new LevelDBState<ProtobufSerializer>(path);
+    storage = new state::LevelDBStorage(path);
+    state = new State(storage);
   }
 
   virtual void TearDown()
   {
     delete state;
+    delete storage;
     os::rmdir(path);
   }
 
-  State<ProtobufSerializer>* state;
+  state::Storage* storage;
+  State* state;
 
 private:
   const std::string path;
 };
 
 
-TEST_F(LevelDBStateTest, GetSetGet)
+TEST_F(LevelDBStateTest, FetchAndStoreAndFetch)
+{
+  FetchAndStoreAndFetch(state);
+}
+
+
+TEST_F(LevelDBStateTest, FetchAndStoreAndStoreAndFetch)
+{
+  FetchAndStoreAndStoreAndFetch(state);
+}
+
+
+TEST_F(LevelDBStateTest, FetchAndStoreAndStoreFailAndFetch)
 {
-  GetSetGet(state);
+  FetchAndStoreAndStoreFailAndFetch(state);
 }
 
 
-TEST_F(LevelDBStateTest, GetSetSetGet)
+TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndFetch)
 {
-  GetSetSetGet(state);
+  FetchAndStoreAndExpungeAndFetch(state);
 }
 
 
-TEST_F(LevelDBStateTest, GetGetSetSetGet)
+TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndExpunge)
 {
-  GetGetSetSetGet(state);
+  FetchAndStoreAndExpungeAndExpunge(state);
+}
+
+
+TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
+{
+  FetchAndStoreAndExpungeAndStoreAndFetch(state);
 }
 
 
@@ -265,45 +395,68 @@ class ZooKeeperStateTest : public tests::ZooKeeperTest
 {
 public:
   ZooKeeperStateTest()
-    : state(NULL) {}
+    : storage(NULL),
+      state(NULL) {}
 
 protected:
   virtual void SetUp()
   {
     ZooKeeperTest::SetUp();
-    state = new ZooKeeperState<ProtobufSerializer>(
+    storage = new state::ZooKeeperStorage(
         server->connectString(),
         NO_TIMEOUT,
         "/state/");
+    state = new State(storage);
   }
 
   virtual void TearDown()
   {
     delete state;
+    delete storage;
     ZooKeeperTest::TearDown();
   }
 
-  State<ProtobufSerializer>* state;
+  state::Storage* storage;
+  State* state;
 };
 
 
-TEST_F(ZooKeeperStateTest, GetSetGet)
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndFetch)
+{
+  FetchAndStoreAndFetch(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndStoreAndFetch)
 {
-  GetSetGet(state);
+  FetchAndStoreAndStoreAndFetch(state);
 }
 
 
-TEST_F(ZooKeeperStateTest, GetSetSetGet)
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndStoreFailAndFetch)
 {
-  GetSetSetGet(state);
+  FetchAndStoreAndStoreFailAndFetch(state);
 }
 
 
-TEST_F(ZooKeeperStateTest, GetGetSetSetGet)
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndFetch)
 {
-  GetGetSetSetGet(state);
+  FetchAndStoreAndExpungeAndFetch(state);
 }
 
+
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndExpunge)
+{
+  FetchAndStoreAndExpungeAndExpunge(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
+{
+  FetchAndStoreAndExpungeAndStoreAndFetch(state);
+}
+
+
 TEST_F(ZooKeeperStateTest, Names)
 {
   Names(state);