You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/26 18:57:46 UTC
[21/28] Refactored base 'State' implementation to be serialization
agnostic and use a 'Storage' instance. Changed the LevelDB and ZooKeeper
implementations to implement 'Storage' instead of 'State'. Provided a
protobuf specific implementation on top of '
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index 2e75673..ca12499 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -31,14 +31,15 @@
#include <stout/gtest.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
+#include <stout/try.hpp>
#include "common/type_utils.hpp"
-#include "messages/messages.hpp"
+#include "master/registry.hpp"
#include "state/leveldb.hpp"
-#include "state/serializer.hpp"
-#include "state/state.hpp"
+#include "state/protobuf.hpp"
+#include "state/storage.hpp"
#include "state/zookeeper.hpp"
#ifdef MESOS_HAS_JAVA
@@ -47,166 +48,272 @@
using namespace mesos;
using namespace mesos::internal;
-using namespace mesos::internal::state;
+using namespace mesos::internal::tests;
using namespace process;
+using mesos::internal::registry::Registry;
+using mesos::internal::registry::Slave;
-void GetSetGet(State<ProtobufSerializer>* state)
-{
- Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
- AWAIT_READY(variable);
+using state::LevelDBStorage;
+using state::Storage;
+#ifdef MESOS_HAS_JAVA
+using state::ZooKeeperStorage;
+#endif
- Variable<Slaves> slaves1 = variable.get();
- EXPECT_TRUE(slaves1->infos().size() == 0);
+using state::protobuf::State;
+using state::protobuf::Variable;
- SlaveInfo info;
- info.set_hostname("localhost");
- info.set_webui_hostname("localhost");
- slaves1->add_infos()->MergeFrom(info);
+void FetchAndStoreAndFetch(State* state)
+{
+ Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
- Future<Option<Variable<Slaves> > > result = state->set(slaves1);
+ Variable<Registry> variable = future1.get();
- result.await();
+ Registry registry1 = variable.get();
+ EXPECT_TRUE(registry1.slaves().size() == 0);
- ASSERT_TRUE(result.isReady());
- ASSERT_SOME(result.get());
+ Slave* slave = registry1.add_slaves();
+ slave->mutable_info()->set_hostname("localhost");
+ slave->mutable_info()->set_webui_hostname("localhost");
- variable = state->get<Slaves>("slaves");
+ variable = variable.mutate(registry1);
- variable.await();
+ Future<Option<Variable<Registry> > > future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
- ASSERT_TRUE(variable.isReady());
+ future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
- Variable<Slaves> slaves2 = variable.get();
+ variable = future1.get();
- ASSERT_TRUE(slaves2->infos().size() == 1);
- EXPECT_EQ("localhost", slaves2->infos(0).hostname());
- EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+ Registry registry2 = variable.get();
+ ASSERT_TRUE(registry2.slaves().size() == 1);
+ EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+ EXPECT_EQ("localhost", registry2.slaves(0).info().webui_hostname());
}
-void GetSetSetGet(State<ProtobufSerializer>* state)
+void FetchAndStoreAndStoreAndFetch(State* state)
{
- Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
- AWAIT_READY(variable);
+ Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
+
+ Variable<Registry> variable = future1.get();
+
+ Registry registry1 = variable.get();
+ EXPECT_TRUE(registry1.slaves().size() == 0);
+
+ Slave* slave = registry1.add_slaves();
+ slave->mutable_info()->set_hostname("localhost");
+ slave->mutable_info()->set_webui_hostname("localhost");
+
+ variable = variable.mutate(registry1);
- Variable<Slaves> slaves1 = variable.get();
- EXPECT_TRUE(slaves1->infos().size() == 0);
+ Future<Option<Variable<Registry> > > future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
- SlaveInfo info;
- info.set_hostname("localhost");
- info.set_webui_hostname("localhost");
+ variable = future2.get().get();
- slaves1->add_infos()->MergeFrom(info);
+ future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
- Future<Option<Variable<Slaves> > > result = state->set(slaves1);
+ future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
- result.await();
+ variable = future1.get();
- ASSERT_TRUE(result.isReady());
- ASSERT_SOME(result.get());
+ Registry registry2 = variable.get();
+ ASSERT_TRUE(registry2.slaves().size() == 1);
+ EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+ EXPECT_EQ("localhost", registry2.slaves(0).info().webui_hostname());
+}
+
+
+void FetchAndStoreAndStoreFailAndFetch(State* state)
+{
+ Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
- slaves1 = result.get().get();
+ Variable<Registry> variable1 = future1.get();
- result = state->set(slaves1);
+ Registry registry1 = variable1.get();
+ EXPECT_TRUE(registry1.slaves().size() == 0);
- result.await();
+ Slave* slave1 = registry1.add_slaves();
+ slave1->mutable_info()->set_hostname("localhost1");
+ slave1->mutable_info()->set_webui_hostname("localhost1");
- ASSERT_TRUE(result.isReady());
- ASSERT_SOME(result.get());
+ Variable<Registry> variable2 = variable1.mutate(registry1);
- variable = state->get<Slaves>("slaves");
+ Future<Option<Variable<Registry> > > future2 = state->store(variable2);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
- variable.await();
+ Registry registry2 = variable1.get();
+ EXPECT_TRUE(registry2.slaves().size() == 0);
- ASSERT_TRUE(variable.isReady());
+ Slave* slave2 = registry2.add_slaves();
+ slave2->mutable_info()->set_hostname("localhost2");
+ slave2->mutable_info()->set_webui_hostname("localhost2");
- Variable<Slaves> slaves2 = variable.get();
+ variable2 = variable1.mutate(registry2);
- ASSERT_TRUE(slaves2->infos().size() == 1);
- EXPECT_EQ("localhost", slaves2->infos(0).hostname());
- EXPECT_EQ("localhost", slaves2->infos(0).webui_hostname());
+ future2 = state->store(variable2);
+ AWAIT_READY(future2);
+ EXPECT_TRUE(future2.get().isNone());
+
+ future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
+
+ variable1 = future1.get();
+
+ registry1 = variable1.get();
+ ASSERT_TRUE(registry1.slaves().size() == 1);
+ EXPECT_EQ("localhost1", registry1.slaves(0).info().hostname());
+ EXPECT_EQ("localhost1", registry1.slaves(0).info().webui_hostname());
}
-void GetGetSetSetGet(State<ProtobufSerializer>* state)
+void FetchAndStoreAndExpungeAndFetch(State* state)
{
- Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
- AWAIT_READY(variable);
+ Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
- Variable<Slaves> slaves1 = variable.get();
- EXPECT_TRUE(slaves1->infos().size() == 0);
+ Variable<Registry> variable = future1.get();
- variable = state->get<Slaves>("slaves");
- AWAIT_READY(variable);
+ Registry registry1 = variable.get();
+ EXPECT_TRUE(registry1.slaves().size() == 0);
- Variable<Slaves> slaves2 = variable.get();
- EXPECT_TRUE(slaves2->infos().size() == 0);
+ Slave* slave = registry1.add_slaves();
+ slave->mutable_info()->set_hostname("localhost");
+ slave->mutable_info()->set_webui_hostname("localhost");
- SlaveInfo info2;
- info2.set_hostname("localhost2");
- info2.set_webui_hostname("localhost2");
+ variable = variable.mutate(registry1);
- slaves2->add_infos()->MergeFrom(info2);
+ Future<Option<Variable<Registry> > > future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
- Future<Option<Variable<Slaves> > > result = state->set(slaves2);
+ variable = future2.get().get();
- result.await();
+ Future<bool> future3 = state->expunge(variable);
+ AWAIT_READY(future3);
+ ASSERT_TRUE(future3.get());
- ASSERT_TRUE(result.isReady());
- ASSERT_SOME(result.get());
+ future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
- SlaveInfo info1;
- info1.set_hostname("localhost1");
- info1.set_webui_hostname("localhost1");
+ variable = future1.get();
- slaves1->add_infos()->MergeFrom(info1);
+ Registry registry2 = variable.get();
+ ASSERT_EQ(0, registry2.slaves().size());
+}
- result = state->set(slaves1);
- AWAIT_READY(result);
- EXPECT_TRUE(result.get().isNone());
- variable = state->get<Slaves>("slaves");
- AWAIT_READY(variable);
+void FetchAndStoreAndExpungeAndExpunge(State* state)
+{
+ Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
+
+ Variable<Registry> variable = future1.get();
+
+ Registry registry1 = variable.get();
+ EXPECT_TRUE(registry1.slaves().size() == 0);
+
+ Slave* slave = registry1.add_slaves();
+ slave->mutable_info()->set_hostname("localhost");
+ slave->mutable_info()->set_webui_hostname("localhost");
- slaves1 = variable.get();
+ variable = variable.mutate(registry1);
- ASSERT_TRUE(slaves1->infos().size() == 1);
- EXPECT_EQ("localhost2", slaves1->infos(0).hostname());
- EXPECT_EQ("localhost2", slaves1->infos(0).webui_hostname());
+ Future<Option<Variable<Registry> > > future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
+
+ variable = future2.get().get();
+
+ Future<bool> future3 = state->expunge(variable);
+ AWAIT_READY(future3);
+ ASSERT_TRUE(future3.get());
+
+ future3 = state->expunge(variable);
+ AWAIT_READY(future3);
+ ASSERT_FALSE(future3.get());
}
-void Names(State<ProtobufSerializer>* state)
+void FetchAndStoreAndExpungeAndStoreAndFetch(State* state)
{
- Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
- AWAIT_READY(variable);
+ Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
- Variable<Slaves> slaves1 = variable.get();
- EXPECT_TRUE(slaves1->infos().size() == 0);
+ Variable<Registry> variable = future1.get();
- SlaveInfo info;
- info.set_hostname("localhost");
- info.set_webui_hostname("localhost");
+ Registry registry1 = variable.get();
+ EXPECT_TRUE(registry1.slaves().size() == 0);
- slaves1->add_infos()->MergeFrom(info);
+ Slave* slave = registry1.add_slaves();
+ slave->mutable_info()->set_hostname("localhost");
+ slave->mutable_info()->set_webui_hostname("localhost");
- Future<Option<Variable<Slaves> > > result = state->set(slaves1);
+ variable = variable.mutate(registry1);
- result.await();
+ Future<Option<Variable<Registry> > > future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
- ASSERT_TRUE(result.isReady());
- EXPECT_SOME(result.get());
+ variable = future2.get().get();
- Future<std::vector<std::string> > names = state->names();
+ Future<bool> future3 = state->expunge(variable);
+ AWAIT_READY(future3);
+ ASSERT_TRUE(future3.get());
+
+ future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
+
+ future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
+
+ variable = future1.get();
+
+ Registry registry2 = variable.get();
+ ASSERT_TRUE(registry2.slaves().size() == 1);
+ EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+ EXPECT_EQ("localhost", registry2.slaves(0).info().webui_hostname());
+}
+
+
+void Names(State* state)
+{
+ Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+ AWAIT_READY(future1);
+
+ Variable<Registry> variable = future1.get();
+
+ Registry registry1 = variable.get();
+ EXPECT_TRUE(registry1.slaves().size() == 0);
+
+ Slave* slave = registry1.add_slaves();
+ slave->mutable_info()->set_hostname("localhost");
+ slave->mutable_info()->set_webui_hostname("localhost");
- names.await();
+ variable = variable.mutate(registry1);
- ASSERT_TRUE(names.isReady());
+ Future<Option<Variable<Registry> > > future2 = state->store(variable);
+ AWAIT_READY(future2);
+ ASSERT_SOME(future2.get());
+
+ Future<std::vector<std::string> > names = state->names();
+ AWAIT_READY(names);
ASSERT_TRUE(names.get().size() == 1);
- EXPECT_EQ("slaves", names.get()[0]);
+ EXPECT_EQ("registry", names.get()[0]);
}
@@ -214,43 +321,66 @@ class LevelDBStateTest : public ::testing::Test
{
public:
LevelDBStateTest()
- : state(NULL), path(os::getcwd() + "/.state") {}
+ : storage(NULL),
+ state(NULL),
+ path(os::getcwd() + "/.state") {}
protected:
virtual void SetUp()
{
os::rmdir(path);
- state = new LevelDBState<ProtobufSerializer>(path);
+ storage = new state::LevelDBStorage(path);
+ state = new State(storage);
}
virtual void TearDown()
{
delete state;
+ delete storage;
os::rmdir(path);
}
- State<ProtobufSerializer>* state;
+ state::Storage* storage;
+ State* state;
private:
const std::string path;
};
-TEST_F(LevelDBStateTest, GetSetGet)
+TEST_F(LevelDBStateTest, FetchAndStoreAndFetch)
+{
+ FetchAndStoreAndFetch(state);
+}
+
+
+TEST_F(LevelDBStateTest, FetchAndStoreAndStoreAndFetch)
+{
+ FetchAndStoreAndStoreAndFetch(state);
+}
+
+
+TEST_F(LevelDBStateTest, FetchAndStoreAndStoreFailAndFetch)
{
- GetSetGet(state);
+ FetchAndStoreAndStoreFailAndFetch(state);
}
-TEST_F(LevelDBStateTest, GetSetSetGet)
+TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndFetch)
{
- GetSetSetGet(state);
+ FetchAndStoreAndExpungeAndFetch(state);
}
-TEST_F(LevelDBStateTest, GetGetSetSetGet)
+TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndExpunge)
{
- GetGetSetSetGet(state);
+ FetchAndStoreAndExpungeAndExpunge(state);
+}
+
+
+TEST_F(LevelDBStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
+{
+ FetchAndStoreAndExpungeAndStoreAndFetch(state);
}
@@ -265,45 +395,68 @@ class ZooKeeperStateTest : public tests::ZooKeeperTest
{
public:
ZooKeeperStateTest()
- : state(NULL) {}
+ : storage(NULL),
+ state(NULL) {}
protected:
virtual void SetUp()
{
ZooKeeperTest::SetUp();
- state = new ZooKeeperState<ProtobufSerializer>(
+ storage = new state::ZooKeeperStorage(
server->connectString(),
NO_TIMEOUT,
"/state/");
+ state = new State(storage);
}
virtual void TearDown()
{
delete state;
+ delete storage;
ZooKeeperTest::TearDown();
}
- State<ProtobufSerializer>* state;
+ state::Storage* storage;
+ State* state;
};
-TEST_F(ZooKeeperStateTest, GetSetGet)
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndFetch)
+{
+ FetchAndStoreAndFetch(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndStoreAndFetch)
{
- GetSetGet(state);
+ FetchAndStoreAndStoreAndFetch(state);
}
-TEST_F(ZooKeeperStateTest, GetSetSetGet)
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndStoreFailAndFetch)
{
- GetSetSetGet(state);
+ FetchAndStoreAndStoreFailAndFetch(state);
}
-TEST_F(ZooKeeperStateTest, GetGetSetSetGet)
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndFetch)
{
- GetGetSetSetGet(state);
+ FetchAndStoreAndExpungeAndFetch(state);
}
+
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndExpunge)
+{
+ FetchAndStoreAndExpungeAndExpunge(state);
+}
+
+
+TEST_F(ZooKeeperStateTest, FetchAndStoreAndExpungeAndStoreAndFetch)
+{
+ FetchAndStoreAndExpungeAndStoreAndFetch(state);
+}
+
+
TEST_F(ZooKeeperStateTest, Names)
{
Names(state);