You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/26 18:57:47 UTC
[22/28] git commit: Refactored base 'State' implementation to be
serialization agnostic and use a 'Storage' instance. Changed the LevelDB and
ZooKeeper implementations to implement 'Storage' instead of 'State'. Provided
a protobuf specific implementation
Refactored base 'State' implementation to be serialization agnostic
and use a 'Storage' instance. Changed the LevelDB and ZooKeeper
implementations to implement 'Storage' instead of 'State'. Provided a
protobuf specific implementation on top of 'State'. Updated code
(including JNI) and tests accordingly.
Review: https://reviews.apache.org/r/11308
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/33f4ff4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/33f4ff4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/33f4ff4b
Branch: refs/heads/master
Commit: 33f4ff4b138dbe11abd2014e1e2b44dd8444a0b3
Parents: 1e35e9e
Author: Benjamin Hindman <be...@twitter.com>
Authored: Mon Jan 14 15:45:29 2013 -0800
Committer: Benjamin Hindman <be...@twitter.com>
Committed: Sun May 26 09:28:36 2013 -0700
----------------------------------------------------------------------
src/Makefile.am | 20 +-
src/java/jni/org_apache_mesos_state_Variable.cpp | 22 +-
.../jni/org_apache_mesos_state_ZooKeeperState.cpp | 318 ++++++++++---
.../src/org/apache/mesos/state/InMemoryState.java | 21 +-
src/java/src/org/apache/mesos/state/State.java | 22 +-
.../src/org/apache/mesos/state/ZooKeeperState.java | 113 +++--
src/master/registry.hpp | 24 +
src/master/registry.proto | 32 ++
src/messages/messages.hpp | 40 ++
src/messages/messages.proto | 6 -
src/state/leveldb.cpp | 77 +++-
src/state/leveldb.hpp | 77 ++--
src/state/protobuf.hpp | 166 +++++++
src/state/serializer.hpp | 67 ---
src/state/state.hpp | 183 +++----
src/state/storage.hpp | 60 +++
src/state/zookeeper.cpp | 175 +++++--
src/state/zookeeper.hpp | 99 ++--
src/tests/state_tests.cpp | 383 ++++++++++-----
19 files changed, 1330 insertions(+), 575 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 0f7794e..7c740fd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -129,6 +129,10 @@ STATE_PROTOS = messages/state.pb.cc messages/state.pb.h
BUILT_SOURCES += $(STATE_PROTOS)
CLEANFILES += $(STATE_PROTOS)
+REGISTRY_PROTOS = master/registry.pb.cc master/registry.pb.h
+
+BUILT_SOURCES += $(REGISTRY_PROTOS)
+CLEANFILES += $(REGISTRY_PROTOS)
# Targets for generating protocol buffer code.
%.pb.cc %.pb.h: $(top_srcdir)/include/mesos/%.proto
@@ -153,7 +157,10 @@ $(PYTHON_PROTOS): $(MESOS_PROTO)
# libraries themselves.
noinst_LTLIBRARIES += libmesos_no_third_party.la
-nodist_libmesos_no_third_party_la_SOURCES = $(CXX_PROTOS) $(MESSAGES_PROTOS)
+nodist_libmesos_no_third_party_la_SOURCES = \
+ $(CXX_PROTOS) \
+ $(MESSAGES_PROTOS) \
+ $(REGISTRY_PROTOS)
libmesos_no_third_party_la_SOURCES = \
sched/sched.cpp \
@@ -162,6 +169,8 @@ libmesos_no_third_party_la_SOURCES = \
master/drf_sorter.cpp \
master/http.cpp \
master/master.cpp \
+ master/registry.hpp \
+ master/registry.proto \
slave/constants.cpp \
slave/gc.cpp \
slave/monitor.cpp \
@@ -278,8 +287,13 @@ libmesos_no_third_party_la_LIBADD += liblog.la
# include the leveldb headers.
noinst_LTLIBRARIES += libstate.la
libstate_la_SOURCES = state/leveldb.cpp state/zookeeper.cpp
-libstate_la_SOURCES += state/leveldb.hpp state/serializer.hpp \
- state/state.hpp state/zookeeper.hpp messages/state.hpp \
+libstate_la_SOURCES += \
+ state/leveldb.hpp \
+ state/protobuf.hpp \
+ state/state.hpp \
+ state/storage.hpp \
+ state/zookeeper.hpp \
+ messages/state.hpp \
messages/state.proto
nodist_libstate_la_SOURCES = $(STATE_PROTOS)
libstate_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/jni/org_apache_mesos_state_Variable.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_state_Variable.cpp b/src/java/jni/org_apache_mesos_state_Variable.cpp
index eacb110..4d840ce 100644
--- a/src/java/jni/org_apache_mesos_state_Variable.cpp
+++ b/src/java/jni/org_apache_mesos_state_Variable.cpp
@@ -20,13 +20,13 @@ JNIEXPORT jbyteArray JNICALL Java_org_apache_mesos_state_Variable_value
jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
- Variable<std::string>* variable =
- (Variable<std::string>*) env->GetLongField(thiz, __variable);
+ Variable* variable = (Variable*) env->GetLongField(thiz, __variable);
+
+ const std::string& value = variable->value();
// byte[] value = ..;
- jbyteArray jvalue = env->NewByteArray((*variable)->size());
- env->SetByteArrayRegion(
- jvalue, 0, (*variable)->size(), (jbyte*) (*variable)->data());
+ jbyteArray jvalue = env->NewByteArray(value.size());
+ env->SetByteArrayRegion(jvalue, 0, value.size(), (jbyte*) value.data());
return jvalue;
}
@@ -44,15 +44,14 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_Variable_mutate
jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
- // Create a copy of the old variable to support the immutable Java API.
- Variable<std::string>* variable = new Variable<std::string>(
- *((Variable<std::string>*) env->GetLongField(thiz, __variable)));
+ Variable* variable = (Variable*) env->GetLongField(thiz, __variable);
jbyte* value = env->GetByteArrayElements(jvalue, NULL);
jsize length = env->GetArrayLength(jvalue);
- // Update the value of the new copy.
- (*variable)->assign((const char*) value, length);
+ // Mutate the variable and save a copy of the result.
+ variable =
+ new Variable(variable->mutate(std::string((const char*) value, length)));
env->ReleaseByteArrayElements(jvalue, value, 0);
@@ -80,8 +79,7 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_Variable_finalize
jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
- Variable<std::string>* variable =
- (Variable<std::string>*) env->GetLongField(thiz, __variable);
+ Variable* variable = (Variable*) env->GetLongField(thiz, __variable);
delete variable;
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp b/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
index 5abf3ef..c3282c0 100644
--- a/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
+++ b/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
@@ -48,11 +48,16 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_initialize__Lj
string znode = construct<string>(env, jznode);
- // Create the C++ State and initialize the __state variable.
- State<>* state = new ZooKeeperState<>(servers, timeout, znode);
+ // Create the C++ Storage and State instances and initialize the
+ // __storage and __state variables.
+ Storage* storage = new ZooKeeperStorage(servers, timeout, znode);
+ State* state = new State(storage);
clazz = env->GetObjectClass(thiz);
+ jfieldID __storage = env->GetFieldID(clazz, "__storage", "J");
+ env->SetLongField(thiz, __storage, (jlong) storage);
+
jfieldID __state = env->GetFieldID(clazz, "__state", "J");
env->SetLongField(thiz, __state, (jlong) state);
}
@@ -87,7 +92,7 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_initialize__Lj
string znode = construct<string>(env, jznode);
// Create the C++ State.
- State<>* state = NULL;
+ Storage* storage = NULL;
if (jscheme != NULL && jcredentials != NULL) {
string scheme = construct<string>(env, jscheme);
@@ -100,16 +105,21 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_initialize__Lj
zookeeper::Authentication authentication(scheme, credentials);
- state = new ZooKeeperState<>(servers, timeout, znode, authentication);
+ storage = new ZooKeeperStorage(servers, timeout, znode, authentication);
} else {
- state = new ZooKeeperState<>(servers, timeout, znode);
+ storage = new ZooKeeperStorage(servers, timeout, znode);
}
- CHECK(state != NULL);
+ CHECK(storage != NULL);
+
+ State* state = new State(storage);
- // Initialize the __state variable.
+ // Initialize the __storage and __state variables.
clazz = env->GetObjectClass(thiz);
+ jfieldID __storage = env->GetFieldID(clazz, "__storage", "J");
+ env->SetLongField(thiz, __storage, (jlong) storage);
+
jfieldID __state = env->GetFieldID(clazz, "__state", "J");
env->SetLongField(thiz, __state, (jlong) state);
}
@@ -127,18 +137,24 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_finalize
jfieldID __state = env->GetFieldID(clazz, "__state", "J");
- State<>* state = (State<>*) env->GetLongField(thiz, __state);
+ State* state = (State*) env->GetLongField(thiz, __state);
delete state;
+
+ jfieldID __storage = env->GetFieldID(clazz, "__storage", "J");
+
+ Storage* storage = (Storage*) env->GetLongField(thiz, __storage);
+
+ delete storage;
}
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __get
+ * Method: __fetch
* Signature: (Ljava/lang/String;)J
*/
-JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch
(JNIEnv* env, jobject thiz, jstring jname)
{
string name = construct<string>(env, jname);
@@ -147,10 +163,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
jfieldID __state = env->GetFieldID(clazz, "__state", "J");
- State<>* state = (State<>*) env->GetLongField(thiz, __state);
+ State* state = (State*) env->GetLongField(thiz, __state);
- Future<Variable<string> >* future =
- new Future<Variable<string> >(state->get<string>(name));
+ Future<Variable>* future = new Future<Variable>(state->fetch(name));
return (jlong) future;
}
@@ -158,13 +173,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __get_cancel
+ * Method: __fetch_cancel
* Signature: (J)Z
*/
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1cancel
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1cancel
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+ Future<Variable>* future = (Future<Variable>*) jfuture;
if (!future->isDiscarded()) {
future->discard();
@@ -177,13 +192,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1c
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __get_is_cancelled
+ * Method: __fetch_is_cancelled
* Signature: (J)Z
*/
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1cancelled
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1is_1cancelled
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+ Future<Variable>* future = (Future<Variable>*) jfuture;
return (jboolean) future->isDiscarded();
}
@@ -191,13 +206,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1i
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __get_is_done
+ * Method: __fetch_is_done
* Signature: (J)Z
*/
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1done
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1is_1done
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+ Future<Variable>* future = (Future<Variable>*) jfuture;
return (jboolean) !future->isPending();
}
@@ -205,13 +220,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1i
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __get_get
+ * Method: __fetch_get
* Signature: (J)Lorg/apache/mesos/state/Variable;
*/
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1get
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1get
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+ Future<Variable>* future = (Future<Variable>*) jfuture;
future->await();
@@ -227,7 +242,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
CHECK(future->isReady());
- Variable<string>* variable = new Variable<string>(future->get());
+ Variable* variable = new Variable(future->get());
// Variable variable = new Variable();
jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -244,13 +259,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __get_get_timeout
+ * Method: __fetch_get_timeout
* Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
*/
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1get_1timeout
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1get_1timeout
(JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
{
- Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+ Future<Variable>* future = (Future<Variable>*) jfuture;
jclass clazz = env->GetObjectClass(junit);
@@ -273,7 +288,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
}
CHECK(future->isReady());
- Variable<string>* variable = new Variable<string>(future->get());
+ Variable* variable = new Variable(future->get());
// Variable variable = new Variable();
clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -296,13 +311,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __get_finalize
+ * Method: __fetch_finalize
* Signature: (J)V
*/
-JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1finalize
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1finalize
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+ Future<Variable>* future = (Future<Variable>*) jfuture;
delete future;
}
@@ -310,27 +325,26 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1final
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __set
+ * Method: __store
* Signature: (Lorg/apache/mesos/state/Variable;)J
*/
-JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store
(JNIEnv* env, jobject thiz, jobject jvariable)
{
jclass clazz = env->GetObjectClass(jvariable);
jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
- Variable<string>* variable = (Variable<string>*)
- env->GetLongField(jvariable, __variable);
+ Variable* variable = (Variable*) env->GetLongField(jvariable, __variable);
clazz = env->GetObjectClass(thiz);
jfieldID __state = env->GetFieldID(clazz, "__state", "J");
- State<>* state = (State<>*) env->GetLongField(thiz, __state);
+ State* state = (State*) env->GetLongField(thiz, __state);
- Future<Option<Variable<string> > >* future =
- new Future<Option<Variable<string> > >(state->set(*variable));
+ Future<Option<Variable> >* future =
+ new Future<Option<Variable> >(state->store(*variable));
return (jlong) future;
}
@@ -338,14 +352,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __set_cancel
+ * Method: __store_cancel
* Signature: (J)Z
*/
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1cancel
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1cancel
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Option<Variable<string> > >* future =
- (Future<Option<Variable<string> > >*) jfuture;
+ Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
if (!future->isDiscarded()) {
future->discard();
@@ -358,14 +371,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1c
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __set_is_cancelled
+ * Method: __store_is_cancelled
* Signature: (J)Z
*/
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1cancelled
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1is_1cancelled
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Option<Variable<string> > >* future =
- (Future<Option<Variable<string> > >*) jfuture;
+ Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
return (jboolean) future->isDiscarded();
}
@@ -373,14 +385,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1i
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __set_is_done
+ * Method: __store_is_done
* Signature: (J)Z
*/
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1done
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1is_1done
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Option<Variable<string> > >* future =
- (Future<Option<Variable<string> > >*) jfuture;
+ Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
return (jboolean) !future->isPending();
}
@@ -388,14 +399,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1i
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __set_get
+ * Method: __store_get
* Signature: (J)Lorg/apache/mesos/state/Variable;
*/
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1get
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1get
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Option<Variable<string> > >* future =
- (Future<Option<Variable<string> > >*) jfuture;
+ Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
future->await();
@@ -412,7 +422,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
CHECK(future->isReady());
if (future->get().isSome()) {
- Variable<string>* variable = new Variable<string>(future->get().get());
+ Variable* variable = new Variable(future->get().get());
// Variable variable = new Variable();
jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -432,14 +442,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __set_get_timeout
+ * Method: __store_get_timeout
* Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
*/
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1get_1timeout
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1get_1timeout
(JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
{
- Future<Option<Variable<string> > >* future =
- (Future<Option<Variable<string> > >*) jfuture;
+ Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
jclass clazz = env->GetObjectClass(junit);
@@ -464,7 +473,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
CHECK(future->isReady());
if (future->get().isSome()) {
- Variable<string>* variable = new Variable<string>(future->get().get());
+ Variable* variable = new Variable(future->get().get());
// Variable variable = new Variable();
clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -490,14 +499,186 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
/*
* Class: org_apache_mesos_state_ZooKeeperState
- * Method: __set_finalize
+ * Method: __store_finalize
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1finalize
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
+
+ delete future;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __expunge
+ * Signature: (Lorg/apache/mesos/state/Variable;)J
+ */
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge
+ (JNIEnv* env, jobject thiz, jobject jvariable)
+{
+ jclass clazz = env->GetObjectClass(jvariable);
+
+ jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+
+ Variable* variable = (Variable*) env->GetLongField(jvariable, __variable);
+
+ clazz = env->GetObjectClass(thiz);
+
+ jfieldID __state = env->GetFieldID(clazz, "__state", "J");
+
+ State* state = (State*) env->GetLongField(thiz, __state);
+
+ Future<bool>* future = new Future<bool>(state->expunge(*variable));
+
+ return (jlong) future;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __expunge_cancel
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1cancel
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<bool>* future = (Future<bool>*) jfuture;
+
+ if (!future->isDiscarded()) {
+ future->discard();
+ return (jboolean) future->isDiscarded();
+ }
+
+ return (jboolean) true;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __expunge_is_cancelled
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1is_1cancelled
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<bool>* future = (Future<bool>*) jfuture;
+
+ return (jboolean) future->isDiscarded();
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __expunge_is_done
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1is_1done
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<bool>* future = (Future<bool>*) jfuture;
+
+ return (jboolean) !future->isPending();
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __expunge_get
+ * Signature: (J)Ljava/lang/Boolean;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1get
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<bool>* future = (Future<bool>*) jfuture;
+
+ future->await();
+
+ if (future->isFailed()) {
+ jclass clazz = env->FindClass("java/util/concurrent/ExecutionException");
+ env->ThrowNew(clazz, future->failure().c_str());
+ return NULL;
+ } else if (future->isDiscarded()) {
+ jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
+ env->ThrowNew(clazz, "Future was discarded");
+ return NULL;
+ }
+
+ CHECK(future->isReady());
+
+ if (future->get()) {
+ jclass clazz = env->FindClass("java/lang/Boolean");
+ jfieldID TRUE = env->GetStaticFieldID(clazz, "TRUE", "Ljava/lang/Boolean;");
+ return env->GetStaticObjectField(clazz, TRUE);
+ }
+
+ jclass clazz = env->FindClass("java/lang/Boolean");
+ jfieldID FALSE = env->GetStaticFieldID(clazz, "FALSE", "Ljava/lang/Boolean;");
+ return env->GetStaticObjectField(clazz, FALSE);
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __expunge_get_timeout
+ * Signature: (JJLjava/util/concurrent/TimeUnit;)Ljava/lang/Boolean;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1get_1timeout
+ (JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
+{
+ Future<bool>* future = (Future<bool>*) jfuture;
+
+ jclass clazz = env->GetObjectClass(junit);
+
+ // long seconds = unit.toSeconds(time);
+ jmethodID toSeconds = env->GetMethodID(clazz, "toSeconds", "(J)J");
+
+ jlong jseconds = env->CallLongMethod(junit, toSeconds, jtimeout);
+
+ Seconds seconds(jseconds);
+
+ if (future->await(seconds)) {
+ if (future->isFailed()) {
+ clazz = env->FindClass("java/util/concurrent/ExecutionException");
+ env->ThrowNew(clazz, future->failure().c_str());
+ return NULL;
+ } else if (future->isDiscarded()) {
+ clazz = env->FindClass("java/util/concurrent/CancellationException");
+ env->ThrowNew(clazz, "Future was discarded");
+ return NULL;
+ }
+
+ CHECK(future->isReady());
+
+ if (future->get()) {
+ jclass clazz = env->FindClass("java/lang/Boolean");
+ jfieldID TRUE = env->GetStaticFieldID(clazz, "TRUE", "Ljava/lang/Boolean;");
+ return env->GetStaticObjectField(clazz, TRUE);
+ }
+
+ jclass clazz = env->FindClass("java/lang/Boolean");
+ jfieldID FALSE = env->GetStaticFieldID(clazz, "FALSE", "Ljava/lang/Boolean;");
+ return env->GetStaticObjectField(clazz, FALSE);
+ }
+
+ clazz = env->FindClass("java/util/concurrent/TimeoutException");
+ env->ThrowNew(clazz, "Failed to wait for future within timeout");
+
+ return NULL;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __expunge_finalize
* Signature: (J)V
*/
-JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1finalize
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1finalize
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Option<Variable<string> > >* future =
- (Future<Option<Variable<string> > >*) jfuture;
+ Future<bool>* future = (Future<bool>*) jfuture;
delete future;
}
@@ -515,9 +696,10 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1names
jfieldID __state = env->GetFieldID(clazz, "__state", "J");
- State<>* state = (State<>*) env->GetLongField(thiz, __state);
+ State* state = (State*) env->GetLongField(thiz, __state);
- Future<vector<string> >* future = new Future<vector<string> >(state->names());
+ Future<vector<string> >* future =
+ new Future<vector<string> >(state->names());
return (jlong) future;
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/src/org/apache/mesos/state/InMemoryState.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/state/InMemoryState.java b/src/java/src/org/apache/mesos/state/InMemoryState.java
index 5de921e..7addd2d 100644
--- a/src/java/src/org/apache/mesos/state/InMemoryState.java
+++ b/src/java/src/org/apache/mesos/state/InMemoryState.java
@@ -32,7 +32,7 @@ import java.util.concurrent.FutureTask;
*/
public class InMemoryState implements State {
@Override
- public Future<Variable> get(String name) {
+ public Future<Variable> fetch(String name) {
Entry entry = entries.get(name); // Is null if doesn't exist.
if (entry == null) {
@@ -40,17 +40,21 @@ public class InMemoryState implements State {
entry.name = name;
entry.uuid = UUID.randomUUID();
entry.value = new byte[0];
- entries.put(name, entry);
- entry = entries.putIfAbsent(name, entry);
+
+ // We use 'putIfAbsent' because multiple threads might be
+ // attempting to fetch a "new" variable at the same time.
+ if (entries.putIfAbsent(name, entry) != null) {
+ return fetch(name);
+ }
}
- assert entry != null; // ConcurrentMap.putIfAbsent should not return null.
+ assert entry != null;
return futureFrom((Variable) new InMemoryVariable(entry));
}
@Override
- public Future<Variable> set(Variable v) {
+ public Future<Variable> store(Variable v) {
InMemoryVariable variable = (InMemoryVariable) v;
Entry entry = new Entry();
@@ -66,6 +70,13 @@ public class InMemoryState implements State {
}
@Override
+ public Future<Boolean> expunge(Variable v) {
+ InMemoryVariable variable = (InMemoryVariable) v;
+
+ return futureFrom(entries.remove(variable.entry.name, variable.entry));
+ }
+
+ @Override
public Future<Iterator<String>> names() {
return futureFrom(entries.keySet().iterator());
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/src/org/apache/mesos/state/State.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/state/State.java b/src/java/src/org/apache/mesos/state/State.java
index 81b8685..dccb0ff 100644
--- a/src/java/src/org/apache/mesos/state/State.java
+++ b/src/java/src/org/apache/mesos/state/State.java
@@ -30,18 +30,18 @@ import java.util.concurrent.Future;
* fetched. Varying implementations of state provide varying
* replicated guarantees.
*
- * Note that the semantics of 'get' and 'set' provide atomicity. That
- * is, you can not set a variable that has changed since you did the
- * last get. That is, if a set succeeds then no other writes have been
- * performed on the variable since your get.
+ * Note that the semantics of 'fetch' and 'store' provide
+ * atomicity. That is, you can not store a variable that has changed
+ * since you did the last fetch. That is, if a store succeeds then no
+ * other writes have been performed on the variable since your fetch.
*
* Example:
*
* State state = new ZooKeeperState();
- * Future<Variable> variable = state.get("machines");
+ * Future<Variable> variable = state.fetch("machines");
* Variable machines = variable.get();
* machines = machines.mutate(...);
- * variable = state.set(machines);
+ * variable = state.store(machines);
* machines = variable.get();
*/
public interface State {
@@ -49,14 +49,20 @@ public interface State {
* Returns an immutable "variable" representing the current value
* from the state associated with the specified name.
*/
- Future<Variable> get(String name);
+ Future<Variable> fetch(String name);
/**
* Returns an immutable "variable" representing the current value in
* the state if updating the specified variable in the state was
* successful, otherwise returns null.
*/
- Future<Variable> set(Variable variable);
+ Future<Variable> store(Variable variable);
+
+ /**
+ * Returns true if successfully expunged the variable from the state
+ * or false if the variable did not exist or was no longer valid.
+ */
+ Future<Boolean> expunge(Variable variable);
/**
* Returns an iterator of variable names in the state.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/src/org/apache/mesos/state/ZooKeeperState.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/state/ZooKeeperState.java b/src/java/src/org/apache/mesos/state/ZooKeeperState.java
index 1ef591a..999d593 100644
--- a/src/java/src/org/apache/mesos/state/ZooKeeperState.java
+++ b/src/java/src/org/apache/mesos/state/ZooKeeperState.java
@@ -73,86 +73,125 @@ public class ZooKeeperState implements State {
}
@Override
- public Future<Variable> get(final String name) {
- final long future = __get(name); // Asynchronously start the operation.
+ public Future<Variable> fetch(final String name) {
+ final long future = __fetch(name); // Asynchronously start the operation.
return new Future<Variable>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (mayInterruptIfRunning) {
- return __get_cancel(future);
+ return __fetch_cancel(future);
}
return false; // Should not interrupt and already running (or finished).
}
@Override
public boolean isCancelled() {
- return __get_is_cancelled(future);
+ return __fetch_is_cancelled(future);
}
@Override
public boolean isDone() {
- return __get_is_done(future);
+ return __fetch_is_done(future);
}
@Override
public Variable get() throws InterruptedException, ExecutionException {
- return __get_get(future);
+ return __fetch_get(future);
}
@Override
public Variable get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- return __get_get_timeout(future, timeout, unit);
+ return __fetch_get_timeout(future, timeout, unit);
}
@Override
protected void finalize() {
- __get_finalize(future);
+ __fetch_finalize(future);
}
};
}
@Override
- public Future<Variable> set(Variable variable) {
- final long future = __set(variable); // Asynchronously start the operation.
+ public Future<Variable> store(Variable variable) {
+ final long future = __store(variable); // Asynchronously start the operation.
return new Future<Variable>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (mayInterruptIfRunning) {
- return __set_cancel(future);
+ return __store_cancel(future);
}
return false; // Should not interrupt and already running (or finished).
}
@Override
public boolean isCancelled() {
- return __set_is_cancelled(future);
+ return __store_is_cancelled(future);
}
@Override
public boolean isDone() {
- return __set_is_done(future);
+ return __store_is_done(future);
}
@Override
public Variable get() throws InterruptedException, ExecutionException {
- return __set_get(future);
+ return __store_get(future);
}
@Override
public Variable get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
- return __set_get_timeout(future, timeout, unit);
+ return __store_get_timeout(future, timeout, unit);
}
@Override
protected void finalize() {
- __set_finalize(future);
+ __store_finalize(future);
}
};
}
@Override
+ public Future<Boolean> expunge(Variable variable) {
+ final long future = __expunge(variable); // Asynchronously start the operation.
+ return new Future<Boolean>() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (mayInterruptIfRunning) {
+ return __expunge_cancel(future);
+ }
+ return false; // Should not interrupt and already running (or finished).
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return __expunge_is_cancelled(future);
+ }
+
+ @Override
+ public boolean isDone() {
+ return __expunge_is_done(future);
+ }
+
+ @Override
+ public Boolean get() throws InterruptedException, ExecutionException {
+ return __expunge_get(future);
+ }
+
+ @Override
+ public Boolean get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return __expunge_get_timeout(future, timeout, unit);
+ }
+
+ @Override
+ protected void finalize() {
+ __expunge_finalize(future);
+ }
+ };
+ }
+
public Future<Iterator<String>> names() {
final long future = __names(); // Asynchronously start the operation.
return new Future<Iterator<String>>() {
@@ -206,24 +245,33 @@ public class ZooKeeperState implements State {
protected native void finalize();
- // Native implementations of get, set, and names.
- private native long __get(String name);
- private native boolean __get_cancel(long future);
- private native boolean __get_is_cancelled(long future);
- private native boolean __get_is_done(long future);
- private native Variable __get_get(long future);
- private native Variable __get_get_timeout(
+ // Native implementations of 'fetch', 'store', 'expunge', and 'names'.
+ private native long __fetch(String name);
+ private native boolean __fetch_cancel(long future);
+ private native boolean __fetch_is_cancelled(long future);
+ private native boolean __fetch_is_done(long future);
+ private native Variable __fetch_get(long future);
+ private native Variable __fetch_get_timeout(
+ long future, long timeout, TimeUnit unit);
+ private native void __fetch_finalize(long future);
+
+ private native long __store(Variable variable);
+ private native boolean __store_cancel(long future);
+ private native boolean __store_is_cancelled(long future);
+ private native boolean __store_is_done(long future);
+ private native Variable __store_get(long future);
+ private native Variable __store_get_timeout(
long future, long timeout, TimeUnit unit);
- private native void __get_finalize(long future);
-
- private native long __set(Variable variable);
- private native boolean __set_cancel(long future);
- private native boolean __set_is_cancelled(long future);
- private native boolean __set_is_done(long future);
- private native Variable __set_get(long future);
- private native Variable __set_get_timeout(
+ private native void __store_finalize(long future);
+
+ private native long __expunge(Variable variable);
+ private native boolean __expunge_cancel(long future);
+ private native boolean __expunge_is_cancelled(long future);
+ private native boolean __expunge_is_done(long future);
+ private native Boolean __expunge_get(long future);
+ private native Boolean __expunge_get_timeout(
long future, long timeout, TimeUnit unit);
- private native void __set_finalize(long future);
+ private native void __expunge_finalize(long future);
private native long __names();
private native boolean __names_cancel(long future);
@@ -234,5 +282,6 @@ public class ZooKeeperState implements State {
long future, long timeout, TimeUnit unit);
private native void __names_finalize(long future);
+ private long __storage;
private long __state;
};
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/master/registry.hpp
----------------------------------------------------------------------
diff --git a/src/master/registry.hpp b/src/master/registry.hpp
new file mode 100644
index 0000000..d3cbe56
--- /dev/null
+++ b/src/master/registry.hpp
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_REGISTRY_HPP__
+#define __MASTER_REGISTRY_HPP__
+
+#include "master/registry.pb.h"
+
+#endif // __MASTER_REGISTRY_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
new file mode 100644
index 0000000..877bfa1
--- /dev/null
+++ b/src/master/registry.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos.proto";
+
+package mesos.internal.registry;
+
+message Slave {
+ // TODO(benh): Add other information here that is internal to Mesos
+ // and shouldn't be exposed in SlaveInfo.
+ required SlaveInfo info = 2;
+}
+
+
+message Registry {
+ repeated Slave slaves = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/messages/messages.hpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.hpp b/src/messages/messages.hpp
index 285cc65..98038c0 100644
--- a/src/messages/messages.hpp
+++ b/src/messages/messages.hpp
@@ -19,6 +19,46 @@
#ifndef __MESSAGES_HPP__
#define __MESSAGES_HPP__
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <string>
+
+#include <stout/try.hpp>
+
#include "messages/messages.pb.h"
+namespace messages {
+
+template <typename T>
+Try<T> deserialize(const std::string& value)
+{
+ T t;
+ (void) static_cast<google::protobuf::Message*>(&t);
+
+ google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+ if (!t.ParseFromZeroCopyStream(&stream)) {
+ return Try<T>::error(
+ "Failed to deserialize " + t.GetDescriptor()->full_name());
+ }
+ return t;
+}
+
+
+template <typename T>
+Try<std::string> serialize(const T& t)
+{
+ (void) static_cast<const google::protobuf::Message*>(&t);
+
+ std::string value;
+ if (!t.SerializeToString(&value)) {
+ return Try<std::string>::error(
+ "Failed to serialize " + t.GetDescriptor()->full_name());
+ }
+ return value;
+}
+
+} // namespace messages {
+
#endif // __MESSAGES_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 3f02474..2c196ee 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -73,12 +73,6 @@ message StatusUpdateRecord {
}
-message Slaves
-{
- repeated SlaveInfo infos = 1;
-}
-
-
message SubmitSchedulerRequest
{
required string name = 1;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.cpp b/src/state/leveldb.cpp
index 5b3fe91..69d5714 100644
--- a/src/state/leveldb.cpp
+++ b/src/state/leveldb.cpp
@@ -2,6 +2,8 @@
#include <google/protobuf/message.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
#include <string>
#include <vector>
@@ -20,7 +22,7 @@
#include "messages/state.hpp"
#include "state/leveldb.hpp"
-#include "state/state.hpp"
+#include "state/storage.hpp"
using namespace process;
@@ -31,17 +33,17 @@ namespace mesos {
namespace internal {
namespace state {
-LevelDBStateProcess::LevelDBStateProcess(const string& _path)
+LevelDBStorageProcess::LevelDBStorageProcess(const string& _path)
: path(_path), db(NULL) {}
-LevelDBStateProcess::~LevelDBStateProcess()
+LevelDBStorageProcess::~LevelDBStorageProcess()
{
- delete db; // Might be null if open failed in LevelDBStateProcess::initialize.
+ delete db; // NULL if open failed in LevelDBStorageProcess::initialize.
}
-void LevelDBStateProcess::initialize()
+void LevelDBStorageProcess::initialize()
{
leveldb::Options options;
options.create_if_missing = true;
@@ -58,7 +60,7 @@ void LevelDBStateProcess::initialize()
}
-Future<vector<string> > LevelDBStateProcess::names()
+Future<vector<string> > LevelDBStorageProcess::names()
{
if (error.isSome()) {
return Future<vector<string> >::failed(error.get());
@@ -81,13 +83,13 @@ Future<vector<string> > LevelDBStateProcess::names()
}
-Future<Option<Entry> > LevelDBStateProcess::fetch(const string& name)
+Future<Option<Entry> > LevelDBStorageProcess::get(const string& name)
{
if (error.isSome()) {
return Future<Option<Entry> >::failed(error.get());
}
- Try<Option<Entry> > option = get(name);
+ Try<Option<Entry> > option = read(name);
if (option.isError()) {
return Future<Option<Entry> >::failed(option.error());
@@ -97,16 +99,16 @@ Future<Option<Entry> > LevelDBStateProcess::fetch(const string& name)
}
-Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
+Future<bool> LevelDBStorageProcess::set(const Entry& entry, const UUID& uuid)
{
if (error.isSome()) {
return Future<bool>::failed(error.get());
}
- // We do a fetch first to make sure the version has not changed. This
+ // We do a read first to make sure the version has not changed. This
// could be optimized in the future, for now it will probably hit
// the cache anyway.
- Try<Option<Entry> > option = get(entry.name());
+ Try<Option<Entry> > option = read(entry.name());
if (option.isError()) {
return Future<bool>::failed(option.error());
@@ -118,11 +120,11 @@ Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
}
}
- // Note that there is no need to do the DB::Get and DB::Put
- // "atomically" because only one db can be opened at a time, so
- // there can not be any writes that occur concurrently.
+ // Note that the read (i.e., DB::Get) and the write (i.e., DB::Put)
+ // are inherently "atomic" because only one db can be opened at a
+ // time, so there can not be any writes that occur concurrently.
- Try<bool> result = put(entry);
+ Try<bool> result = write(entry);
if (result.isError()) {
return Future<bool>::failed(result.error());
@@ -132,7 +134,48 @@ Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
}
-Try<Option<Entry> > LevelDBStateProcess::get(const string& name)
+Future<bool> LevelDBStorageProcess::expunge(const Entry& entry)
+{
+ if (error.isSome()) {
+ return Future<bool>::failed(error.get());
+ }
+
+ // We do a read first to make sure the version has not changed. This
+ // could be optimized in the future, for now it will probably hit
+ // the cache anyway.
+ Try<Option<Entry> > option = read(entry.name());
+
+ if (option.isError()) {
+ return Future<bool>::failed(option.error());
+ }
+
+ if (option.get().isNone()) {
+ return false;
+ }
+
+ if (UUID::fromBytes(option.get().get().uuid()) !=
+ UUID::fromBytes(entry.uuid())) {
+ return false;
+ }
+
+ // Note that the read (i.e., DB::Get) and DB::Delete are inherently
+ // "atomic" because only one db can be opened at a time, so there
+ // can not be any writes that occur concurrently.
+
+ leveldb::WriteOptions options;
+ options.sync = true;
+
+ leveldb::Status status = db->Delete(options, entry.name());
+
+ if (!status.ok()) {
+ return Future<bool>::failed(status.ToString());
+ }
+
+ return true;
+}
+
+
+Try<Option<Entry> > LevelDBStorageProcess::read(const string& name)
{
CHECK(error.isNone());
@@ -160,7 +203,7 @@ Try<Option<Entry> > LevelDBStateProcess::get(const string& name)
}
-Try<bool> LevelDBStateProcess::put(const Entry& entry)
+Try<bool> LevelDBStorageProcess::write(const Entry& entry)
{
CHECK(error.isNone());
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/leveldb.hpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.hpp b/src/state/leveldb.hpp
index 94f306b..14a94cc 100644
--- a/src/state/leveldb.hpp
+++ b/src/state/leveldb.hpp
@@ -14,8 +14,7 @@
#include "messages/state.hpp"
-#include "state/serializer.hpp"
-#include "state/state.hpp"
+#include "state/storage.hpp"
// Forward declarations.
namespace leveldb { class DB; }
@@ -26,46 +25,44 @@ namespace internal {
namespace state {
// More forward declarations.
-class LevelDBStateProcess;
+class LevelDBStorageProcess;
-template <typename Serializer = StringSerializer>
-class LevelDBState : public State<Serializer>
+class LevelDBStorage : public Storage
{
public:
- LevelDBState(const std::string& path);
- virtual ~LevelDBState();
+ LevelDBStorage(const std::string& path);
+ virtual ~LevelDBStorage();
- // State implementation.
+ // Storage implementation.
+ virtual process::Future<Option<Entry> > get(const std::string& name);
+ virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
+ virtual process::Future<bool> expunge(const Entry& entry);
virtual process::Future<std::vector<std::string> > names();
-protected:
- // More State implementation.
- virtual process::Future<Option<Entry> > fetch(const std::string& name);
- virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
private:
- LevelDBStateProcess* process;
+ LevelDBStorageProcess* process;
};
-class LevelDBStateProcess : public process::Process<LevelDBStateProcess>
+class LevelDBStorageProcess : public process::Process<LevelDBStorageProcess>
{
public:
- LevelDBStateProcess(const std::string& path);
- virtual ~LevelDBStateProcess();
+ LevelDBStorageProcess(const std::string& path);
+ virtual ~LevelDBStorageProcess();
virtual void initialize();
- // State implementation.
+ // Storage implementation.
+ process::Future<Option<Entry> > get(const std::string& name);
+ process::Future<bool> set(const Entry& entry, const UUID& uuid);
+ process::Future<bool> expunge(const Entry& entry);
process::Future<std::vector<std::string> > names();
- process::Future<Option<Entry> > fetch(const std::string& name);
- process::Future<bool> swap(const Entry& entry, const UUID& uuid);
private:
// Helpers for interacting with leveldb.
- Try<Option<Entry> > get(const std::string& name);
- Try<bool> put(const Entry& entry);
+ Try<Option<Entry> > read(const std::string& name);
+ Try<bool> write(const Entry& entry);
const std::string path;
leveldb::DB* db;
@@ -74,16 +71,14 @@ private:
};
-template <typename Serializer>
-LevelDBState<Serializer>::LevelDBState(const std::string& path)
+inline LevelDBStorage::LevelDBStorage(const std::string& path)
{
- process = new LevelDBStateProcess(path);
+ process = new LevelDBStorageProcess(path);
process::spawn(process);
}
-template <typename Serializer>
-LevelDBState<Serializer>::~LevelDBState()
+inline LevelDBStorage::~LevelDBStorage()
{
process::terminate(process);
process::wait(process);
@@ -91,27 +86,31 @@ LevelDBState<Serializer>::~LevelDBState()
}
-template <typename Serializer>
-process::Future<std::vector<std::string> > LevelDBState<Serializer>::names()
+inline process::Future<Option<Entry> > LevelDBStorage::get(
+ const std::string& name)
{
- return process::dispatch(process, &LevelDBStateProcess::names);
+ return process::dispatch(process, &LevelDBStorageProcess::get, name);
}
-template <typename Serializer>
-process::Future<Option<Entry> > LevelDBState<Serializer>::fetch(
- const std::string& name)
+inline process::Future<bool> LevelDBStorage::set(
+ const Entry& entry,
+ const UUID& uuid)
{
- return process::dispatch(process, &LevelDBStateProcess::fetch, name);
+ return process::dispatch(process, &LevelDBStorageProcess::set, entry, uuid);
}
-template <typename Serializer>
-process::Future<bool> LevelDBState<Serializer>::swap(
- const Entry& entry,
- const UUID& uuid)
+inline process::Future<bool> LevelDBStorage::expunge(
+ const Entry& entry)
+{
+ return process::dispatch(process, &LevelDBStorageProcess::expunge, entry);
+}
+
+
+inline process::Future<std::vector<std::string> > LevelDBStorage::names()
{
- return process::dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
+ return process::dispatch(process, &LevelDBStorageProcess::names);
}
} // namespace state {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/protobuf.hpp
----------------------------------------------------------------------
diff --git a/src/state/protobuf.hpp b/src/state/protobuf.hpp
new file mode 100644
index 0000000..75e082b
--- /dev/null
+++ b/src/state/protobuf.hpp
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATE_PROTOBUF_HPP__
+#define __STATE_PROTOBUF_HPP__
+
+#include <string>
+
+#include <process/future.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/messages.hpp"
+#include "messages/state.hpp"
+
+#include "state/state.hpp"
+#include "state/storage.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+namespace protobuf {
+
+template <typename T>
+class Variable
+{
+public:
+ T get() const
+ {
+ return t;
+ }
+
+ Variable mutate(const T& t) const
+ {
+ Variable variable(*this);
+ variable.t = t;
+ return variable;
+ }
+
+private:
+ friend class State; // Creates and manages variables.
+
+ Variable(const state::Variable& _variable, const T& _t)
+ : variable(_variable), t(_t)
+ {}
+
+ state::Variable variable; // Not const to keep Variable assignable.
+ T t;
+};
+
+
+class State : public state::State
+{
+public:
+ State(Storage* storage) : state::State(storage) {}
+ virtual ~State() {}
+
+ // Returns a variable from the state, creating a new one if one
+ // previously did not exist (or an error if one occurs).
+ template <typename T>
+ process::Future<Variable<T> > fetch(const std::string& name);
+
+ // Returns the variable specified if it was successfully stored in
+ // the state, otherwise returns none if the version of the variable
+ // was no longer valid, or an error if one occurs.
+ template <typename T>
+ process::Future<Option<Variable<T> > > store(const Variable<T>& variable);
+
+ // Expunges the variable from the state.
+ template <typename T>
+ process::Future<bool> expunge(const Variable<T>& variable);
+
+private:
+ // Helpers to handle future results from fetch and swap. We make
+ // these static members of State for friend access to Variable's
+ // constructor.
+ template <typename T>
+ static process::Future<Variable<T> > _fetch(
+ const state::Variable& option);
+
+ template <typename T>
+ static process::Future<Option<Variable<T> > > _store(
+ const T& t,
+ const Option<state::Variable>& variable);
+};
+
+
+template <typename T>
+process::Future<Variable<T> > State::fetch(const std::string& name)
+{
+ return state::State::fetch(name)
+ .then(lambda::bind(&State::template _fetch<T>, lambda::_1));
+}
+
+
+template <typename T>
+process::Future<Variable<T> > State::_fetch(
+ const state::Variable& variable)
+{
+ Try<T> t = messages::deserialize<T>(variable.value());
+ if (t.isError()) {
+ return process::Future<Variable<T> >::failed(t.error());
+ }
+
+ return Variable<T>(variable, t.get());
+}
+
+
+template <typename T>
+process::Future<Option<Variable<T> > > State::store(
+ const Variable<T>& variable)
+{
+ Try<std::string> value = messages::serialize(variable.t);
+
+ if (value.isError()) {
+ return process::Future<Option<Variable<T> > >::failed(value.error());
+ }
+
+ return state::State::store(variable.variable.mutate(value.get()))
+ .then(lambda::bind(&State::template _store<T>, variable.t, lambda::_1));
+}
+
+
+template <typename T>
+process::Future<Option<Variable<T> > > State::_store(
+ const T& t,
+ const Option<state::Variable>& variable)
+{
+ if (variable.isSome()) {
+ return Option<Variable<T> >::some(Variable<T>(variable.get(), t));
+ }
+
+ return None();
+}
+
+
+template <typename T>
+process::Future<bool> State::expunge(const Variable<T>& variable)
+{
+ return state::State::expunge(variable.variable);
+}
+
+} // namespace protobuf {
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_PROTOBUF_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/serializer.hpp
----------------------------------------------------------------------
diff --git a/src/state/serializer.hpp b/src/state/serializer.hpp
deleted file mode 100644
index bd8c5df..0000000
--- a/src/state/serializer.hpp
+++ /dev/null
@@ -1,67 +0,0 @@
-#ifndef __STATE_SERIALIZER_HPP__
-#define __STATE_SERIALIZER_HPP__
-
-#include <google/protobuf/message.h>
-
-#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
-
-#include <string>
-
-#include <stout/error.hpp>
-#include <stout/try.hpp>
-
-namespace mesos {
-namespace internal {
-namespace state {
-
-struct StringSerializer
-{
- template <typename T>
- static Try<std::string> deserialize(const std::string& value)
- {
- return value;
- }
-
- template <typename T>
- static Try<std::string> serialize(const std::string& value)
- {
- return value;
- }
-};
-
-
-struct ProtobufSerializer
-{
- template <typename T>
- static Try<T> deserialize(const std::string& value)
- {
- T t;
- (void)static_cast<google::protobuf::Message*>(&t);
-
- google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
- if (!t.ParseFromZeroCopyStream(&stream)) {
- return Error(
- "Failed to deserialize " + t.GetDescriptor()->full_name());
- }
- return t;
- }
-
- template <typename T>
- static Try<std::string> serialize(const T& t)
- {
- // TODO(benh): Actually store the descriptor so that we can verify
- // type information (and compatibility) when we deserialize.
- std::string value;
- if (!t.SerializeToString(&value)) {
- return Error(
- "Failed to serialize " + t.GetDescriptor()->full_name());
- }
- return value;
- }
-};
-
-} // namespace state {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __STATE_SERIALIZER_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/state.hpp
----------------------------------------------------------------------
diff --git a/src/state/state.hpp b/src/state/state.hpp
index c616e00..b3abf89 100644
--- a/src/state/state.hpp
+++ b/src/state/state.hpp
@@ -24,16 +24,15 @@
#include <process/future.hpp>
+#include <stout/lambda.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
#include <stout/try.hpp>
#include <stout/uuid.hpp>
-#include "logging/logging.hpp"
-
#include "messages/state.hpp"
-#include "state/serializer.hpp"
+#include "state/storage.hpp"
namespace mesos {
namespace internal {
@@ -46,187 +45,153 @@ namespace state {
// fetched. Varying implementations of state provide varying
// replicated guarantees.
//
-// Note that the semantics of 'get' and 'set' provide atomicity. That
-// is, you can not set a variable that has changed since you did the
-// last get. That is, if a set succeeds then no other writes have been
-// performed on the variable since your get.
-
+// Note that the semantics of 'fetch' and 'store' provide
+// atomicity. That is, you can not store a variable that has changed
+// since you did the last fetch. That is, if a store succeeds then no
+// other writes have been performed on the variable since your fetch.
+//
// Example:
-
-// State<ProtobufSerializer>* state = new ZooKeeperState<ProtobufSerializer>();
-// Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
-// Variable<Slaves> slaves = variable.get();
-// slaves->add_infos()->MergeFrom(info);
-// Future<bool> set = state->set(&slaves);
+//
+// Storage* storage = new ZooKeeperStorage();
+// State* state = new State(storage);
+// Future<Variable> variable = state->fetch("slaves");
+// std::string value = update(variable.value());
+// variable = variable.mutate(value);
+// state->store(variable);
// Forward declarations.
-template <typename Serializer>
class State;
-class ZooKeeperStateProcess;
-template <typename T>
+// Wrapper around a state "entry" to force immutability.
class Variable
{
public:
- T* operator -> ()
+ std::string value() const
{
- return &t;
+ return entry.value();
+ }
+
+ Variable mutate(const std::string& value) const
+ {
+ Variable variable(*this);
+ variable.entry.set_value(value);
+ return variable;
}
private:
- template <typename Serializer>
friend class State; // Creates and manages variables.
- Variable(const Entry& _entry, const T& _t)
- : entry(_entry), t(_t)
+ Variable(const Entry& _entry)
+ : entry(_entry)
{}
- Entry entry; // Not const so Variable is copyable.
- T t;
+ Entry entry; // Not const to keep Variable assignable.
};
-template <typename Serializer = StringSerializer>
class State
{
public:
- State() {}
+ State(Storage* _storage) : storage(_storage) {}
virtual ~State() {}
// Returns a variable from the state, creating a new one if one
// previously did not exist (or an error if one occurs).
- template <typename T>
- process::Future<Variable<T> > get(const std::string& name);
+ process::Future<Variable> fetch(const std::string& name);
- // Returns true if the variable was successfully set in the state,
- // otherwise false if the version of the variable was no longer
- // valid (or an error if one occurs).
- template <typename T>
- process::Future<Option<Variable<T> > > set(const Variable<T>& variable);
+ // Returns the variable specified if it was successfully stored in
+ // the state, otherwise returns none if the version of the variable
+ // was no longer valid, or an error if one occurs.
+ process::Future<Option<Variable> > store(const Variable& variable);
- // Returns the collection of variable names in the state.
- virtual process::Future<std::vector<std::string> > names() = 0;
+ // Returns true if successfully expunged the variable from the state.
+ process::Future<bool> expunge(const Variable& variable);
-protected:
- // Fetch and swap state entries, factored out to allow State
- // implementations to be agnostic of Variable which is templated.
- virtual process::Future<Option<Entry> > fetch(const std::string& name) = 0;
- virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid) = 0;
+ // Returns the collection of variable names in the state.
+ process::Future<std::vector<std::string> > names();
private:
// Helpers to handle future results from fetch and swap. We make
// these static members of State for friend access to Variable's
// constructor.
- template <typename T>
- static process::Future<Variable<T> > _get(
+ static process::Future<Variable> _fetch(
const std::string& name,
const Option<Entry>& option);
- template <typename T>
- static process::Future<Option<Variable<T> > > _set(
+ static process::Future<Option<Variable> > _store(
const Entry& entry,
- const T& t,
const bool& b); // TODO(benh): Remove 'const &' after fixing libprocess.
+
+ Storage* storage;
};
-template <typename Serializer>
-template <typename T>
-process::Future<Variable<T> > State<Serializer>::_get(
+inline process::Future<Variable> State::fetch(const std::string& name)
+{
+ return storage->get(name)
+ .then(lambda::bind(&State::_fetch, name, lambda::_1));
+}
+
+
+inline process::Future<Variable> State::_fetch(
const std::string& name,
const Option<Entry>& option)
{
if (option.isSome()) {
- const Entry& entry = option.get();
-
- Try<T> t = Serializer::template deserialize<T>(entry.value());
-
- if (t.isError()) {
- return process::Future<Variable<T> >::failed(t.error());
- }
-
- return Variable<T>(entry, t.get());
+ return Variable(option.get());
}
- // Otherwise, construct a Variable out of a new Entry with a default
- // value for T (and a random UUID to start).
- T t;
+ // Otherwise, construct a Variable with a new Entry (with a random
+ // UUID and no value to start).
+ Entry entry;
+ entry.set_name(name);
+ entry.set_uuid(UUID::random().toBytes());
+
+ return Variable(entry);
+}
- Try<std::string> value = Serializer::template serialize<T>(t);
- if (value.isError()) {
- return process::Future<Variable<T> >::failed(value.error());
- }
+inline process::Future<Option<Variable> > State::store(const Variable& variable)
+{
+ // Note that we try and swap an entry even if the value didn't change!
+ UUID uuid = UUID::fromBytes(variable.entry.uuid());
+ // Create a new entry to replace the existing entry provided the
+ // UUID matches.
Entry entry;
- entry.set_name(name);
+ entry.set_name(variable.entry.name());
entry.set_uuid(UUID::random().toBytes());
- entry.set_value(value.get());
+ entry.set_value(variable.entry.value());
- return Variable<T>(entry, t);
+ return storage->set(entry, uuid)
+ .then(lambda::bind(&State::_store, entry, lambda::_1));
}
-template <typename Serializer>
-template <typename T>
-process::Future<Option<Variable<T> > > State<Serializer>::_set(
+inline process::Future<Option<Variable > > State::_store(
const Entry& entry,
- const T& t,
const bool& b) // TODO(benh): Remove 'const &' after fixing libprocess.
{
if (b) {
- return Option<Variable<T> >::some(Variable<T>(entry, t));
+ return Option<Variable>::some(Variable(entry));
}
return None();
}
-template <typename Serializer>
-template <typename T>
-process::Future<Variable<T> > State<Serializer>::get(const std::string& name)
+inline process::Future<bool> State::expunge(const Variable& variable)
{
- return fetch(name)
- .then(std::tr1::bind(&State<Serializer>::template _get<T>,
- name,
- std::tr1::placeholders::_1));
+ return storage->expunge(variable.entry);
}
-template <typename Serializer>
-template <typename T>
-process::Future<Option<Variable<T> > > State<Serializer>::set(
- const Variable<T>& variable)
+inline process::Future<std::vector<std::string> > State::names()
{
- Try<std::string> value = Serializer::template serialize<T>(variable.t);
-
- if (value.isError()) {
- return process::Future<Option<Variable<T> > >::failed(value.error());
- }
-
- // Note that we try and swap an entry even if the value didn't change!
- UUID uuid = UUID::fromBytes(variable.entry.uuid());
-
- // Create a new entry that should be replace the existing entry
- // provided the UUID matches.
- Entry entry;
- entry.set_name(variable.entry.name());
- entry.set_uuid(UUID::random().toBytes());
- entry.set_value(value.get());
-
- std::tr1::function<
- process::Future<Option<Variable<T> > >(const bool&)> _set =
- std::tr1::bind(&State<Serializer>::template _set<T>,
- entry,
- variable.t,
- std::tr1::placeholders::_1);
-
- return swap(entry, uuid).then(_set);
+ return storage->names();
}
-
-
} // namespace state {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/storage.hpp
----------------------------------------------------------------------
diff --git a/src/state/storage.hpp b/src/state/storage.hpp
new file mode 100644
index 0000000..a137075
--- /dev/null
+++ b/src/state/storage.hpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATE_STORAGE_HPP__
+#define __STATE_STORAGE_HPP__
+
+#include <string>
+#include <vector>
+
+#include <process/future.hpp>
+
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+class Storage
+{
+public:
+ Storage() {}
+ virtual ~Storage() {}
+
+ // Get and set state entries, factored out to allow Storage
+ // implementations to be agnostic of Variable. Note that set acts
+ // like a "test-and-set" by requiring the existing entry to have the
+ // specified UUID.
+ virtual process::Future<Option<Entry> > get(const std::string& name) = 0;
+ virtual process::Future<bool> set(const Entry& entry, const UUID& uuid) = 0;
+
+ // Returns true if successfully expunged the variable from the state.
+ virtual process::Future<bool> expunge(const Entry& entry) = 0;
+
+ // Returns the collection of variable names in the state.
+ virtual process::Future<std::vector<std::string> > names() = 0;
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_STORAGE_HPP__
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.cpp b/src/state/zookeeper.cpp
index 07339ff..1801fce 100644
--- a/src/state/zookeeper.cpp
+++ b/src/state/zookeeper.cpp
@@ -1,5 +1,7 @@
#include <google/protobuf/message.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
#include <queue>
#include <string>
#include <vector>
@@ -21,7 +23,7 @@
#include "messages/state.hpp"
-#include "state/state.hpp"
+#include "state/storage.hpp"
#include "state/zookeeper.hpp"
#include "zookeeper/authentication.hpp"
@@ -53,7 +55,7 @@ void fail(queue<T*>* queue, const string& message)
}
-ZooKeeperStateProcess::ZooKeeperStateProcess(
+ZooKeeperStorageProcess::ZooKeeperStorageProcess(
const string& _servers,
const Duration& _timeout,
const string& _znode,
@@ -71,27 +73,27 @@ ZooKeeperStateProcess::ZooKeeperStateProcess(
{}
-ZooKeeperStateProcess::~ZooKeeperStateProcess()
+ZooKeeperStorageProcess::~ZooKeeperStorageProcess()
{
- fail(&pending.names, "No longer managing state");
- fail(&pending.fetches, "No longer managing state");
- fail(&pending.swaps, "No longer managing state");
+ fail(&pending.names, "No longer managing storage");
+ fail(&pending.gets, "No longer managing storage");
+ fail(&pending.sets, "No longer managing storage");
delete zk;
delete watcher;
}
-void ZooKeeperStateProcess::initialize()
+void ZooKeeperStorageProcess::initialize()
{
// Doing initialization here allows to avoid the race between
// instantiating the ZooKeeper instance and being spawned ourself.
- watcher = new ProcessWatcher<ZooKeeperStateProcess>(self());
+ watcher = new ProcessWatcher<ZooKeeperStorageProcess>(self());
zk = new ZooKeeper(servers, timeout, watcher);
}
-Future<vector<string> > ZooKeeperStateProcess::names()
+Future<vector<string> > ZooKeeperStorageProcess::names()
{
if (error.isSome()) {
return Future<vector<string> >::failed(error.get());
@@ -115,22 +117,22 @@ Future<vector<string> > ZooKeeperStateProcess::names()
}
-Future<Option<Entry> > ZooKeeperStateProcess::fetch(const string& name)
+Future<Option<Entry> > ZooKeeperStorageProcess::get(const string& name)
{
if (error.isSome()) {
return Future<Option<Entry> >::failed(error.get());
} else if (state != CONNECTED) {
- Fetch* fetch = new Fetch(name);
- pending.fetches.push(fetch);
- return fetch->promise.future();
+ Get* get = new Get(name);
+ pending.gets.push(get);
+ return get->promise.future();
}
- Result<Option<Entry> > result = doFetch(name);
+ Result<Option<Entry> > result = doGet(name);
if (result.isNone()) { // Try again later.
- Fetch* fetch = new Fetch(name);
- pending.fetches.push(fetch);
- return fetch->promise.future();
+ Get* get = new Get(name);
+ pending.gets.push(get);
+ return get->promise.future();
} else if (result.isError()) {
return Future<Option<Entry> >::failed(result.error());
}
@@ -139,22 +141,46 @@ Future<Option<Entry> > ZooKeeperStateProcess::fetch(const string& name)
}
-Future<bool> ZooKeeperStateProcess::swap(const Entry& entry, const UUID& uuid)
+Future<bool> ZooKeeperStorageProcess::set(const Entry& entry, const UUID& uuid)
+{
+ if (error.isSome()) {
+ return Future<bool>::failed(error.get());
+ } else if (state != CONNECTED) {
+ Set* set = new Set(entry, uuid);
+ pending.sets.push(set);
+ return set->promise.future();
+ }
+
+ Result<bool> result = doSet(entry, uuid);
+
+ if (result.isNone()) { // Try again later.
+ Set* set = new Set(entry, uuid);
+ pending.sets.push(set);
+ return set->promise.future();
+ } else if (result.isError()) {
+ return Future<bool>::failed(result.error());
+ }
+
+ return result.get();
+}
+
+
+Future<bool> ZooKeeperStorageProcess::expunge(const Entry& entry)
{
if (error.isSome()) {
return Future<bool>::failed(error.get());
} else if (state != CONNECTED) {
- Swap* swap = new Swap(entry, uuid);
- pending.swaps.push(swap);
- return swap->promise.future();
+ Expunge* expunge = new Expunge(entry);
+ pending.expunges.push(expunge);
+ return expunge->promise.future();
}
- Result<bool> result = doSwap(entry, uuid);
+ Result<bool> result = doExpunge(entry);
if (result.isNone()) { // Try again later.
- Swap* swap = new Swap(entry, uuid);
- pending.swaps.push(swap);
- return swap->promise.future();
+ Expunge* expunge = new Expunge(entry);
+ pending.expunges.push(expunge);
+ return expunge->promise.future();
} else if (result.isError()) {
return Future<bool>::failed(result.error());
}
@@ -163,7 +189,7 @@ Future<bool> ZooKeeperStateProcess::swap(const Entry& entry, const UUID& uuid)
}
-void ZooKeeperStateProcess::connected(bool reconnect)
+void ZooKeeperStorageProcess::connected(bool reconnect)
{
if (!reconnect) {
// Authenticate if necessary (and we are connected for the first
@@ -196,43 +222,43 @@ void ZooKeeperStateProcess::connected(bool reconnect)
delete names;
}
- while (!pending.fetches.empty()) {
- Fetch* fetch = pending.fetches.front();
- Result<Option<Entry> > result = doFetch(fetch->name);
+ while (!pending.gets.empty()) {
+ Get* get = pending.gets.front();
+ Result<Option<Entry> > result = doGet(get->name);
if (result.isNone()) {
return; // Try again later.
} else if (result.isError()) {
- fetch->promise.fail(result.error());
+ get->promise.fail(result.error());
} else {
- fetch->promise.set(result.get());
+ get->promise.set(result.get());
}
- pending.fetches.pop();
- delete fetch;
+ pending.gets.pop();
+ delete get;
}
- while (!pending.swaps.empty()) {
- Swap* swap = pending.swaps.front();
- Result<bool> result = doSwap(swap->entry, swap->uuid);
+ while (!pending.sets.empty()) {
+ Set* set = pending.sets.front();
+ Result<bool> result = doSet(set->entry, set->uuid);
if (result.isNone()) {
return; // Try again later.
} else if (result.isError()) {
- swap->promise.fail(result.error());
+ set->promise.fail(result.error());
} else {
- swap->promise.set(result.get());
+ set->promise.set(result.get());
}
- pending.swaps.pop();
- delete swap;
+ pending.sets.pop();
+ delete set;
}
}
-void ZooKeeperStateProcess::reconnecting()
+void ZooKeeperStorageProcess::reconnecting()
{
state = CONNECTING;
}
-void ZooKeeperStateProcess::expired()
+void ZooKeeperStorageProcess::expired()
{
state = DISCONNECTED;
@@ -243,25 +269,25 @@ void ZooKeeperStateProcess::expired()
}
-void ZooKeeperStateProcess::updated(const string& path)
+void ZooKeeperStorageProcess::updated(const string& path)
{
LOG(FATAL) << "Unexpected ZooKeeper event";
}
-void ZooKeeperStateProcess::created(const string& path)
+void ZooKeeperStorageProcess::created(const string& path)
{
LOG(FATAL) << "Unexpected ZooKeeper event";
}
-void ZooKeeperStateProcess::deleted(const string& path)
+void ZooKeeperStorageProcess::deleted(const string& path)
{
LOG(FATAL) << "Unexpected ZooKeeper event";
}
-Result<vector<string> > ZooKeeperStateProcess::doNames()
+Result<vector<string> > ZooKeeperStorageProcess::doNames()
{
// Get all children to determine current memberships.
vector<string> results;
@@ -284,7 +310,7 @@ Result<vector<string> > ZooKeeperStateProcess::doNames()
}
-Result<Option<Entry> > ZooKeeperStateProcess::doFetch(const string& name)
+Result<Option<Entry> > ZooKeeperStorageProcess::doGet(const string& name)
{
CHECK(error.isNone()) << ": " << error.get();
CHECK(state == CONNECTED);
@@ -317,7 +343,7 @@ Result<Option<Entry> > ZooKeeperStateProcess::doFetch(const string& name)
}
-Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
+Result<bool> ZooKeeperStorageProcess::doSet(const Entry& entry, const UUID& uuid)
{
CHECK(error.isNone()) << ": " << error.get();
CHECK(state == CONNECTED);
@@ -397,7 +423,7 @@ Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
return false;
}
- // Okay, do a set, we get atomic swap by requiring 'stat.version'.
+ // Okay, do the set, we get atomicity by requiring 'stat.version'.
code = zk->set(znode + "/" + entry.name(), data, stat.version);
if (code == ZBADVERSION) {
@@ -414,6 +440,57 @@ Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
return true;
}
+
+Result<bool> ZooKeeperStorageProcess::doExpunge(const Entry& entry)
+{
+ CHECK(error.isNone()) << ": " << error.get();
+ CHECK(state == CONNECTED);
+
+ string result;
+ Stat stat;
+
+ int code = zk->get(znode + "/" + entry.name(), false, &result, &stat);
+
+ if (code == ZNONODE) {
+ return false;
+ } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ return None(); // Try again later.
+ } else if (code != ZOK) {
+ return Error(
+ "Failed to get '" + znode + "/" + entry.name() +
+ "' in ZooKeeper: " + zk->message(code));
+ }
+
+ google::protobuf::io::ArrayInputStream stream(result.data(), result.size());
+
+ Entry current;
+
+ if (!current.ParseFromZeroCopyStream(&stream)) {
+ return Error("Failed to deserialize Entry");
+ }
+
+ if (UUID::fromBytes(current.uuid()) != UUID::fromBytes(entry.uuid())) {
+ return false;
+ }
+
+ // Okay, do the remove, we get atomicity by requiring 'stat.version'.
+ code = zk->remove(znode + "/" + entry.name(), stat.version);
+
+ if (code == ZBADVERSION) {
+ return false;
+ } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ return None(); // Try again later.
+ } else if (code != ZOK) {
+ return Error(
+ "Failed to remove '" + znode + "/" + entry.name() +
+ "' in ZooKeeper: " + zk->message(code));
+ }
+
+ return true;
+}
+
} // namespace state {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.hpp b/src/state/zookeeper.hpp
index 1e332ed..90b6607 100644
--- a/src/state/zookeeper.hpp
+++ b/src/state/zookeeper.hpp
@@ -17,8 +17,7 @@
#include "messages/state.hpp"
-#include "state/serializer.hpp"
-#include "state/state.hpp"
+#include "state/storage.hpp"
#include "zookeeper/authentication.hpp"
#include "zookeeper/watcher.hpp"
@@ -29,51 +28,49 @@ namespace internal {
namespace state {
// Forward declarations.
-class ZooKeeperStateProcess;
+class ZooKeeperStorageProcess;
-template <typename Serializer = StringSerializer>
-class ZooKeeperState : public State<Serializer>
+class ZooKeeperStorage : public Storage
{
public:
// TODO(benh): Just take a zookeeper::URL.
- ZooKeeperState(
+ ZooKeeperStorage(
const std::string& servers,
const Duration& timeout,
const std::string& znode,
const Option<zookeeper::Authentication>& auth =
Option<zookeeper::Authentication>());
- virtual ~ZooKeeperState();
+ virtual ~ZooKeeperStorage();
- // State implementation.
+ // Storage implementation.
+ virtual process::Future<Option<Entry> > get(const std::string& name);
+ virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
+ virtual process::Future<bool> expunge(const Entry& entry);
virtual process::Future<std::vector<std::string> > names();
-protected:
- // More State implementation.
- virtual process::Future<Option<Entry> > fetch(const std::string& name);
- virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
private:
- ZooKeeperStateProcess* process;
+ ZooKeeperStorageProcess* process;
};
-class ZooKeeperStateProcess : public process::Process<ZooKeeperStateProcess>
+class ZooKeeperStorageProcess : public process::Process<ZooKeeperStorageProcess>
{
public:
- ZooKeeperStateProcess(
+ ZooKeeperStorageProcess(
const std::string& servers,
const Duration& timeout,
const std::string& znode,
const Option<zookeeper::Authentication>& auth);
- virtual ~ZooKeeperStateProcess();
+ virtual ~ZooKeeperStorageProcess();
virtual void initialize();
- // State implementation.
+ // Storage implementation.
+ process::Future<Option<Entry> > get(const std::string& name);
+ process::Future<bool> set(const Entry& entry, const UUID& uuid);
+ virtual process::Future<bool> expunge(const Entry& entry);
process::Future<std::vector<std::string> > names();
- process::Future<Option<Entry> > fetch(const std::string& name);
- process::Future<bool> swap(const Entry& entry, const UUID& uuid);
// ZooKeeper events.
void connected(bool reconnect);
@@ -86,8 +83,9 @@ public:
private:
// Helpers for getting the names, fetching, and swapping.
Result<std::vector<std::string> > doNames();
- Result<Option<Entry> > doFetch(const std::string& name);
- Result<bool> doSwap(const Entry& entry, const UUID& uuid);
+ Result<Option<Entry> > doGet(const std::string& name);
+ Result<bool> doSet(const Entry& entry, const UUID& uuid);
+ Result<bool> doExpunge(const Entry& entry);
const std::string servers;
const Duration timeout;
@@ -111,49 +109,56 @@ private:
process::Promise<std::vector<std::string> > promise;
};
- struct Fetch
+ struct Get
{
- Fetch(const std::string& _name)
+ Get(const std::string& _name)
: name(_name) {}
std::string name;
process::Promise<Option<Entry> > promise;
};
- struct Swap
+ struct Set
{
- Swap(const Entry& _entry, const UUID& _uuid)
+ Set(const Entry& _entry, const UUID& _uuid)
: entry(_entry), uuid(_uuid) {}
Entry entry;
UUID uuid;
process::Promise<bool> promise;
};
+ struct Expunge
+ {
+ Expunge(const Entry& _entry)
+ : entry(_entry) {}
+ Entry entry;
+ process::Promise<bool> promise;
+ };
+
// TODO(benh): Make pending a single queue of "operations" that can
// be "invoked" (C++11 lambdas would help).
struct {
std::queue<Names*> names;
- std::queue<Fetch*> fetches;
- std::queue<Swap*> swaps;
+ std::queue<Get*> gets;
+ std::queue<Set*> sets;
+ std::queue<Expunge*> expunges;
} pending;
Option<std::string> error;
};
-template <typename Serializer>
-ZooKeeperState<Serializer>::ZooKeeperState(
+inline ZooKeeperStorage::ZooKeeperStorage(
const std::string& servers,
const Duration& timeout,
const std::string& znode,
const Option<zookeeper::Authentication>& auth)
{
- process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
+ process = new ZooKeeperStorageProcess(servers, timeout, znode, auth);
process::spawn(process);
}
-template <typename Serializer>
-ZooKeeperState<Serializer>::~ZooKeeperState()
+inline ZooKeeperStorage::~ZooKeeperStorage()
{
process::terminate(process);
process::wait(process);
@@ -161,27 +166,31 @@ ZooKeeperState<Serializer>::~ZooKeeperState()
}
-template <typename Serializer>
-process::Future<std::vector<std::string> > ZooKeeperState<Serializer>::names()
+inline process::Future<Option<Entry> > ZooKeeperStorage::get(
+ const std::string& name)
{
- return process::dispatch(process, &ZooKeeperStateProcess::names);
+ return process::dispatch(process, &ZooKeeperStorageProcess::get, name);
}
-template <typename Serializer>
-process::Future<Option<Entry> > ZooKeeperState<Serializer>::fetch(
- const std::string& name)
+inline process::Future<bool> ZooKeeperStorage::set(
+ const Entry& entry,
+ const UUID& uuid)
{
- return process::dispatch(process, &ZooKeeperStateProcess::fetch, name);
+ return process::dispatch(process, &ZooKeeperStorageProcess::set, entry, uuid);
}
-template <typename Serializer>
-process::Future<bool> ZooKeeperState<Serializer>::swap(
- const Entry& entry,
- const UUID& uuid)
+inline process::Future<bool> ZooKeeperStorage::expunge(
+ const Entry& entry)
+{
+ return process::dispatch(process, &ZooKeeperStorageProcess::expunge, entry);
+}
+
+
+inline process::Future<std::vector<std::string> > ZooKeeperStorage::names()
{
- return process::dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
+ return process::dispatch(process, &ZooKeeperStorageProcess::names);
}
} // namespace state {