You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/05/26 18:57:47 UTC

[22/28] git commit: Refactored base 'State' implementation to be serialization agnostic and use a 'Storage' instance. Changed the LevelDB and ZooKeeper implementations to implement 'Storage' instead of 'State'. Provided a protobuf specific implementation

Refactored base 'State' implementation to be serialization agnostic
and use a 'Storage' instance. Changed the LevelDB and ZooKeeper
implementations to implement 'Storage' instead of 'State'. Provided a
protobuf specific implementation on top of 'State'. Updated code
(including JNI) and tests accordingly.

Review: https://reviews.apache.org/r/11308


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/33f4ff4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/33f4ff4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/33f4ff4b

Branch: refs/heads/master
Commit: 33f4ff4b138dbe11abd2014e1e2b44dd8444a0b3
Parents: 1e35e9e
Author: Benjamin Hindman <be...@twitter.com>
Authored: Mon Jan 14 15:45:29 2013 -0800
Committer: Benjamin Hindman <be...@twitter.com>
Committed: Sun May 26 09:28:36 2013 -0700

----------------------------------------------------------------------
 src/Makefile.am                                    |   20 +-
 src/java/jni/org_apache_mesos_state_Variable.cpp   |   22 +-
 .../jni/org_apache_mesos_state_ZooKeeperState.cpp  |  318 ++++++++++---
 .../src/org/apache/mesos/state/InMemoryState.java  |   21 +-
 src/java/src/org/apache/mesos/state/State.java     |   22 +-
 .../src/org/apache/mesos/state/ZooKeeperState.java |  113 +++--
 src/master/registry.hpp                            |   24 +
 src/master/registry.proto                          |   32 ++
 src/messages/messages.hpp                          |   40 ++
 src/messages/messages.proto                        |    6 -
 src/state/leveldb.cpp                              |   77 +++-
 src/state/leveldb.hpp                              |   77 ++--
 src/state/protobuf.hpp                             |  166 +++++++
 src/state/serializer.hpp                           |   67 ---
 src/state/state.hpp                                |  183 +++----
 src/state/storage.hpp                              |   60 +++
 src/state/zookeeper.cpp                            |  175 +++++--
 src/state/zookeeper.hpp                            |   99 ++--
 src/tests/state_tests.cpp                          |  383 ++++++++++-----
 19 files changed, 1330 insertions(+), 575 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 0f7794e..7c740fd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -129,6 +129,10 @@ STATE_PROTOS = messages/state.pb.cc messages/state.pb.h
 BUILT_SOURCES += $(STATE_PROTOS)
 CLEANFILES += $(STATE_PROTOS)
 
+REGISTRY_PROTOS = master/registry.pb.cc master/registry.pb.h
+
+BUILT_SOURCES += $(REGISTRY_PROTOS)
+CLEANFILES += $(REGISTRY_PROTOS)
 
 # Targets for generating protocol buffer code.
 %.pb.cc %.pb.h: $(top_srcdir)/include/mesos/%.proto
@@ -153,7 +157,10 @@ $(PYTHON_PROTOS): $(MESOS_PROTO)
 # libraries themselves.
 noinst_LTLIBRARIES += libmesos_no_third_party.la
 
-nodist_libmesos_no_third_party_la_SOURCES = $(CXX_PROTOS) $(MESSAGES_PROTOS)
+nodist_libmesos_no_third_party_la_SOURCES = 	\
+  $(CXX_PROTOS) 				\
+  $(MESSAGES_PROTOS) 				\
+  $(REGISTRY_PROTOS)
 
 libmesos_no_third_party_la_SOURCES =					\
 	sched/sched.cpp							\
@@ -162,6 +169,8 @@ libmesos_no_third_party_la_SOURCES =					\
 	master/drf_sorter.cpp						\
 	master/http.cpp							\
 	master/master.cpp						\
+	master/registry.hpp						\
+	master/registry.proto                                           \
 	slave/constants.cpp						\
 	slave/gc.cpp							\
 	slave/monitor.cpp						\
@@ -278,8 +287,13 @@ libmesos_no_third_party_la_LIBADD += liblog.la
 # include the leveldb headers.
 noinst_LTLIBRARIES += libstate.la
 libstate_la_SOURCES = state/leveldb.cpp state/zookeeper.cpp
-libstate_la_SOURCES += state/leveldb.hpp state/serializer.hpp	\
-  state/state.hpp state/zookeeper.hpp messages/state.hpp	\
+libstate_la_SOURCES += 			\
+  state/leveldb.hpp			\
+  state/protobuf.hpp			\
+  state/state.hpp 			\
+  state/storage.hpp 			\
+  state/zookeeper.hpp 			\
+  messages/state.hpp			\
   messages/state.proto
 nodist_libstate_la_SOURCES = $(STATE_PROTOS)
 libstate_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/jni/org_apache_mesos_state_Variable.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_state_Variable.cpp b/src/java/jni/org_apache_mesos_state_Variable.cpp
index eacb110..4d840ce 100644
--- a/src/java/jni/org_apache_mesos_state_Variable.cpp
+++ b/src/java/jni/org_apache_mesos_state_Variable.cpp
@@ -20,13 +20,13 @@ JNIEXPORT jbyteArray JNICALL Java_org_apache_mesos_state_Variable_value
 
   jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
 
-  Variable<std::string>* variable =
-    (Variable<std::string>*) env->GetLongField(thiz, __variable);
+  Variable* variable = (Variable*) env->GetLongField(thiz, __variable);
+
+  const std::string& value = variable->value();
 
   // byte[] value = ..;
-  jbyteArray jvalue = env->NewByteArray((*variable)->size());
-  env->SetByteArrayRegion(
-      jvalue, 0, (*variable)->size(), (jbyte*) (*variable)->data());
+  jbyteArray jvalue = env->NewByteArray(value.size());
+  env->SetByteArrayRegion(jvalue, 0, value.size(), (jbyte*) value.data());
 
   return jvalue;
 }
@@ -44,15 +44,14 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_Variable_mutate
 
   jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
 
-  // Create a copy of the old variable to support the immutable Java API.
-  Variable<std::string>* variable = new Variable<std::string>(
-      *((Variable<std::string>*) env->GetLongField(thiz, __variable)));
+  Variable* variable = (Variable*) env->GetLongField(thiz, __variable);
 
   jbyte* value = env->GetByteArrayElements(jvalue, NULL);
   jsize length = env->GetArrayLength(jvalue);
 
-  // Update the value of the new copy.
-  (*variable)->assign((const char*) value, length);
+  // Mutate the variable and save a copy of the result.
+  variable =
+    new Variable(variable->mutate(std::string((const char*) value, length)));
 
   env->ReleaseByteArrayElements(jvalue, value, 0);
 
@@ -80,8 +79,7 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_Variable_finalize
 
   jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
 
-  Variable<std::string>* variable =
-    (Variable<std::string>*) env->GetLongField(thiz, __variable);
+  Variable* variable = (Variable*) env->GetLongField(thiz, __variable);
 
   delete variable;
 }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp b/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
index 5abf3ef..c3282c0 100644
--- a/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
+++ b/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
@@ -48,11 +48,16 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_initialize__Lj
 
   string znode = construct<string>(env, jznode);
 
-   // Create the C++ State and initialize the __state variable.
-  State<>* state = new ZooKeeperState<>(servers, timeout, znode);
+   // Create the C++ Storage and State instances and initialize the
+   // __storage and __state variables.
+  Storage* storage = new ZooKeeperStorage(servers, timeout, znode);
+  State* state = new State(storage);
 
   clazz = env->GetObjectClass(thiz);
 
+  jfieldID __storage = env->GetFieldID(clazz, "__storage", "J");
+  env->SetLongField(thiz, __storage, (jlong) storage);
+
   jfieldID __state = env->GetFieldID(clazz, "__state", "J");
   env->SetLongField(thiz, __state, (jlong) state);
 }
@@ -87,7 +92,7 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_initialize__Lj
   string znode = construct<string>(env, jznode);
 
   // Create the C++ State.
-  State<>* state = NULL;
+  Storage* storage = NULL;
   if (jscheme != NULL && jcredentials != NULL) {
     string scheme = construct<string>(env, jscheme);
 
@@ -100,16 +105,21 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_initialize__Lj
 
     zookeeper::Authentication authentication(scheme, credentials);
 
-    state = new ZooKeeperState<>(servers, timeout, znode, authentication);
+    storage = new ZooKeeperStorage(servers, timeout, znode, authentication);
   } else {
-    state = new ZooKeeperState<>(servers, timeout, znode);
+    storage = new ZooKeeperStorage(servers, timeout, znode);
   }
 
-  CHECK(state != NULL);
+  CHECK(storage != NULL);
+
+  State* state = new State(storage);
 
-  // Initialize the __state variable.
+  // Initialize the __storage and __state variables.
   clazz = env->GetObjectClass(thiz);
 
+  jfieldID __storage = env->GetFieldID(clazz, "__storage", "J");
+  env->SetLongField(thiz, __storage, (jlong) storage);
+
   jfieldID __state = env->GetFieldID(clazz, "__state", "J");
   env->SetLongField(thiz, __state, (jlong) state);
 }
@@ -127,18 +137,24 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState_finalize
 
   jfieldID __state = env->GetFieldID(clazz, "__state", "J");
 
-  State<>* state = (State<>*) env->GetLongField(thiz, __state);
+  State* state = (State*) env->GetLongField(thiz, __state);
 
   delete state;
+
+  jfieldID __storage = env->GetFieldID(clazz, "__storage", "J");
+
+  Storage* storage = (Storage*) env->GetLongField(thiz, __storage);
+
+  delete storage;
 }
 
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __get
+ * Method:    __fetch
  * Signature: (Ljava/lang/String;)J
  */
-JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch
   (JNIEnv* env, jobject thiz, jstring jname)
 {
   string name = construct<string>(env, jname);
@@ -147,10 +163,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
 
   jfieldID __state = env->GetFieldID(clazz, "__state", "J");
 
-  State<>* state = (State<>*) env->GetLongField(thiz, __state);
+  State* state = (State*) env->GetLongField(thiz, __state);
 
-  Future<Variable<string> >* future =
-    new Future<Variable<string> >(state->get<string>(name));
+  Future<Variable>* future = new Future<Variable>(state->fetch(name));
 
   return (jlong) future;
 }
@@ -158,13 +173,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __get_cancel
+ * Method:    __fetch_cancel
  * Signature: (J)Z
  */
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1cancel
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1cancel
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+  Future<Variable>* future = (Future<Variable>*) jfuture;
 
   if (!future->isDiscarded()) {
     future->discard();
@@ -177,13 +192,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1c
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __get_is_cancelled
+ * Method:    __fetch_is_cancelled
  * Signature: (J)Z
  */
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1cancelled
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1is_1cancelled
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+  Future<Variable>* future = (Future<Variable>*) jfuture;
 
   return (jboolean) future->isDiscarded();
 }
@@ -191,13 +206,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1i
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __get_is_done
+ * Method:    __fetch_is_done
  * Signature: (J)Z
  */
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1done
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1is_1done
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+  Future<Variable>* future = (Future<Variable>*) jfuture;
 
   return (jboolean) !future->isPending();
 }
@@ -205,13 +220,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1i
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __get_get
+ * Method:    __fetch_get
  * Signature: (J)Lorg/apache/mesos/state/Variable;
  */
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1get
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1get
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+  Future<Variable>* future = (Future<Variable>*) jfuture;
 
   future->await();
 
@@ -227,7 +242,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
 
   CHECK(future->isReady());
 
-  Variable<string>* variable = new Variable<string>(future->get());
+  Variable* variable = new Variable(future->get());
 
   // Variable variable = new Variable();
   jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -244,13 +259,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __get_get_timeout
+ * Method:    __fetch_get_timeout
  * Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
  */
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1get_1timeout
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1get_1timeout
   (JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
 {
-  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+  Future<Variable>* future = (Future<Variable>*) jfuture;
 
   jclass clazz = env->GetObjectClass(junit);
 
@@ -273,7 +288,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
     }
 
     CHECK(future->isReady());
-    Variable<string>* variable = new Variable<string>(future->get());
+    Variable* variable = new Variable(future->get());
 
     // Variable variable = new Variable();
     clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -296,13 +311,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1ge
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __get_finalize
+ * Method:    __fetch_finalize
  * Signature: (J)V
  */
-JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1finalize
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1fetch_1finalize
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+  Future<Variable>* future = (Future<Variable>*) jfuture;
 
   delete future;
 }
@@ -310,27 +325,26 @@ JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1final
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __set
+ * Method:    __store
  * Signature: (Lorg/apache/mesos/state/Variable;)J
  */
-JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store
   (JNIEnv* env, jobject thiz, jobject jvariable)
 {
   jclass clazz = env->GetObjectClass(jvariable);
 
   jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
 
-  Variable<string>* variable = (Variable<string>*)
-    env->GetLongField(jvariable, __variable);
+  Variable* variable = (Variable*) env->GetLongField(jvariable, __variable);
 
   clazz = env->GetObjectClass(thiz);
 
   jfieldID __state = env->GetFieldID(clazz, "__state", "J");
 
-  State<>* state = (State<>*) env->GetLongField(thiz, __state);
+  State* state = (State*) env->GetLongField(thiz, __state);
 
-  Future<Option<Variable<string> > >* future =
-    new Future<Option<Variable<string> > >(state->set(*variable));
+  Future<Option<Variable> >* future =
+    new Future<Option<Variable> >(state->store(*variable));
 
   return (jlong) future;
 }
@@ -338,14 +352,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __set_cancel
+ * Method:    __store_cancel
  * Signature: (J)Z
  */
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1cancel
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1cancel
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Option<Variable<string> > >* future =
-    (Future<Option<Variable<string> > >*) jfuture;
+  Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
 
   if (!future->isDiscarded()) {
     future->discard();
@@ -358,14 +371,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1c
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __set_is_cancelled
+ * Method:    __store_is_cancelled
  * Signature: (J)Z
  */
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1cancelled
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1is_1cancelled
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Option<Variable<string> > >* future =
-    (Future<Option<Variable<string> > >*) jfuture;
+  Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
 
   return (jboolean) future->isDiscarded();
 }
@@ -373,14 +385,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1i
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __set_is_done
+ * Method:    __store_is_done
  * Signature: (J)Z
  */
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1done
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1is_1done
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Option<Variable<string> > >* future =
-    (Future<Option<Variable<string> > >*) jfuture;
+  Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
 
   return (jboolean) !future->isPending();
 }
@@ -388,14 +399,13 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1i
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __set_get
+ * Method:    __store_get
  * Signature: (J)Lorg/apache/mesos/state/Variable;
  */
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1get
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1get
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Option<Variable<string> > >* future =
-    (Future<Option<Variable<string> > >*) jfuture;
+  Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
 
   future->await();
 
@@ -412,7 +422,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
   CHECK(future->isReady());
 
   if (future->get().isSome()) {
-    Variable<string>* variable = new Variable<string>(future->get().get());
+    Variable* variable = new Variable(future->get().get());
 
     // Variable variable = new Variable();
     jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -432,14 +442,13 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __set_get_timeout
+ * Method:    __store_get_timeout
  * Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
  */
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1get_1timeout
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1get_1timeout
   (JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
 {
-  Future<Option<Variable<string> > >* future =
-    (Future<Option<Variable<string> > >*) jfuture;
+  Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
 
   jclass clazz = env->GetObjectClass(junit);
 
@@ -464,7 +473,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
     CHECK(future->isReady());
 
     if (future->get().isSome()) {
-      Variable<string>* variable = new Variable<string>(future->get().get());
+      Variable* variable = new Variable(future->get().get());
 
       // Variable variable = new Variable();
       clazz = env->FindClass("org/apache/mesos/state/Variable");
@@ -490,14 +499,186 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1ge
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
- * Method:    __set_finalize
+ * Method:    __store_finalize
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1store_1finalize
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
+
+  delete future;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __expunge
+ * Signature: (Lorg/apache/mesos/state/Variable;)J
+ */
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge
+  (JNIEnv* env, jobject thiz, jobject jvariable)
+{
+  jclass clazz = env->GetObjectClass(jvariable);
+
+  jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+
+  Variable* variable = (Variable*) env->GetLongField(jvariable, __variable);
+
+  clazz = env->GetObjectClass(thiz);
+
+  jfieldID __state = env->GetFieldID(clazz, "__state", "J");
+
+  State* state = (State*) env->GetLongField(thiz, __state);
+
+  Future<bool>* future = new Future<bool>(state->expunge(*variable));
+
+  return (jlong) future;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __expunge_cancel
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1cancel
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<bool>* future = (Future<bool>*) jfuture;
+
+  if (!future->isDiscarded()) {
+    future->discard();
+    return (jboolean) future->isDiscarded();
+  }
+
+  return (jboolean) true;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __expunge_is_cancelled
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1is_1cancelled
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<bool>* future = (Future<bool>*) jfuture;
+
+  return (jboolean) future->isDiscarded();
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __expunge_is_done
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1is_1done
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<bool>* future = (Future<bool>*) jfuture;
+
+  return (jboolean) !future->isPending();
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __expunge_get
+ * Signature: (J)Ljava/lang/Boolean;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1get
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<bool>* future = (Future<bool>*) jfuture;
+
+  future->await();
+
+  if (future->isFailed()) {
+    jclass clazz = env->FindClass("java/util/concurrent/ExecutionException");
+    env->ThrowNew(clazz, future->failure().c_str());
+    return NULL;
+  } else if (future->isDiscarded()) {
+    jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
+    env->ThrowNew(clazz, "Future was discarded");
+    return NULL;
+  }
+
+  CHECK(future->isReady());
+
+  if (future->get()) {
+    jclass clazz = env->FindClass("java/lang/Boolean");
+    jfieldID TRUE = env->GetStaticFieldID(clazz, "TRUE", "Ljava/lang/Boolean;");
+    return env->GetStaticObjectField(clazz, TRUE);
+  }
+
+  jclass clazz = env->FindClass("java/lang/Boolean");
+  jfieldID FALSE = env->GetStaticFieldID(clazz, "FALSE", "Ljava/lang/Boolean;");
+  return env->GetStaticObjectField(clazz, FALSE);
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __expunge_get_timeout
+ * Signature: (JJLjava/util/concurrent/TimeUnit;)Ljava/lang/Boolean;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1get_1timeout
+  (JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
+{
+  Future<bool>* future = (Future<bool>*) jfuture;
+
+  jclass clazz = env->GetObjectClass(junit);
+
+  // long seconds = unit.toSeconds(time);
+  jmethodID toSeconds = env->GetMethodID(clazz, "toSeconds", "(J)J");
+
+  jlong jseconds = env->CallLongMethod(junit, toSeconds, jtimeout);
+
+  Seconds seconds(jseconds);
+
+  if (future->await(seconds)) {
+    if (future->isFailed()) {
+      clazz = env->FindClass("java/util/concurrent/ExecutionException");
+      env->ThrowNew(clazz, future->failure().c_str());
+      return NULL;
+    } else if (future->isDiscarded()) {
+      clazz = env->FindClass("java/util/concurrent/CancellationException");
+      env->ThrowNew(clazz, "Future was discarded");
+      return NULL;
+    }
+
+    CHECK(future->isReady());
+
+    if (future->get()) {
+      jclass clazz = env->FindClass("java/lang/Boolean");
+      jfieldID TRUE = env->GetStaticFieldID(clazz, "TRUE", "Ljava/lang/Boolean;");
+      return env->GetStaticObjectField(clazz, TRUE);
+    }
+
+    jclass clazz = env->FindClass("java/lang/Boolean");
+    jfieldID FALSE = env->GetStaticFieldID(clazz, "FALSE", "Ljava/lang/Boolean;");
+    return env->GetStaticObjectField(clazz, FALSE);
+  }
+
+  clazz = env->FindClass("java/util/concurrent/TimeoutException");
+  env->ThrowNew(clazz, "Failed to wait for future within timeout");
+
+  return NULL;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __expunge_finalize
  * Signature: (J)V
  */
-JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1finalize
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1expunge_1finalize
   (JNIEnv* env, jobject thiz, jlong jfuture)
 {
-  Future<Option<Variable<string> > >* future =
-    (Future<Option<Variable<string> > >*) jfuture;
+  Future<bool>* future = (Future<bool>*) jfuture;
 
   delete future;
 }
@@ -515,9 +696,10 @@ JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1names
 
   jfieldID __state = env->GetFieldID(clazz, "__state", "J");
 
-  State<>* state = (State<>*) env->GetLongField(thiz, __state);
+  State* state = (State*) env->GetLongField(thiz, __state);
 
-  Future<vector<string> >* future = new Future<vector<string> >(state->names());
+  Future<vector<string> >* future =
+    new Future<vector<string> >(state->names());
 
   return (jlong) future;
 }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/src/org/apache/mesos/state/InMemoryState.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/state/InMemoryState.java b/src/java/src/org/apache/mesos/state/InMemoryState.java
index 5de921e..7addd2d 100644
--- a/src/java/src/org/apache/mesos/state/InMemoryState.java
+++ b/src/java/src/org/apache/mesos/state/InMemoryState.java
@@ -32,7 +32,7 @@ import java.util.concurrent.FutureTask;
  */
 public class InMemoryState implements State {
   @Override
-  public Future<Variable> get(String name) {
+  public Future<Variable> fetch(String name) {
     Entry entry = entries.get(name); // Is null if doesn't exist.
 
     if (entry == null) {
@@ -40,17 +40,21 @@ public class InMemoryState implements State {
       entry.name = name;
       entry.uuid = UUID.randomUUID();
       entry.value = new byte[0];
-      entries.put(name, entry);
-      entry = entries.putIfAbsent(name, entry);
+
+      // We use 'putIfAbsent' because multiple threads might be
+      // attempting to fetch a "new" variable at the same time.
+      if (entries.putIfAbsent(name, entry) != null) {
+        return fetch(name);
+      }
     }
 
-    assert entry != null; // ConcurrentMap.putIfAbsent should not return null.
+    assert entry != null;
 
     return futureFrom((Variable) new InMemoryVariable(entry));
   }
 
   @Override
-  public Future<Variable> set(Variable v) {
+  public Future<Variable> store(Variable v) {
     InMemoryVariable variable = (InMemoryVariable) v;
 
     Entry entry = new Entry();
@@ -66,6 +70,13 @@ public class InMemoryState implements State {
   }
 
   @Override
+  public Future<Boolean> expunge(Variable v) {
+    InMemoryVariable variable = (InMemoryVariable) v;
+
+    return futureFrom(entries.remove(variable.entry.name, variable.entry));
+  }
+
+  @Override
   public Future<Iterator<String>> names() {
     return futureFrom(entries.keySet().iterator());
   }

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/src/org/apache/mesos/state/State.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/state/State.java b/src/java/src/org/apache/mesos/state/State.java
index 81b8685..dccb0ff 100644
--- a/src/java/src/org/apache/mesos/state/State.java
+++ b/src/java/src/org/apache/mesos/state/State.java
@@ -30,18 +30,18 @@ import java.util.concurrent.Future;
  * fetched. Varying implementations of state provide varying
  * replicated guarantees.
  *
- * Note that the semantics of 'get' and 'set' provide atomicity. That
- * is, you can not set a variable that has changed since you did the
- * last get. That is, if a set succeeds then no other writes have been
- * performed on the variable since your get.
+ * Note that the semantics of 'fetch' and 'store' provide
+ * atomicity. That is, you can not store a variable that has changed
+ * since you did the last fetch. That is, if a store succeeds then no
+ * other writes have been performed on the variable since your fetch.
  *
  * Example:
  *
  *   State state = new ZooKeeperState();
- *   Future<Variable> variable = state.get("machines");
+ *   Future<Variable> variable = state.fetch("machines");
  *   Variable machines = variable.get();
  *   machines = machines.mutate(...);
- *   variable = state.set(machines);
+ *   variable = state.store(machines);
  *   machines = variable.get();
  */
 public interface State {
@@ -49,14 +49,20 @@ public interface State {
    * Returns an immutable "variable" representing the current value
    * from the state associated with the specified name.
    */
-  Future<Variable> get(String name);
+  Future<Variable> fetch(String name);
 
   /**
    * Returns an immutable "variable" representing the current value in
    * the state if updating the specified variable in the state was
    * successful, otherwise returns null.
    */
-  Future<Variable> set(Variable variable);
+  Future<Variable> store(Variable variable);
+
+  /**
+   * Returns true if successfully expunged the variable from the state
+   * or false if the variable did not exist or was no longer valid.
+   */
+  Future<Boolean> expunge(Variable variable);
 
   /**
    * Returns an iterator of variable names in the state.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/java/src/org/apache/mesos/state/ZooKeeperState.java
----------------------------------------------------------------------
diff --git a/src/java/src/org/apache/mesos/state/ZooKeeperState.java b/src/java/src/org/apache/mesos/state/ZooKeeperState.java
index 1ef591a..999d593 100644
--- a/src/java/src/org/apache/mesos/state/ZooKeeperState.java
+++ b/src/java/src/org/apache/mesos/state/ZooKeeperState.java
@@ -73,86 +73,125 @@ public class ZooKeeperState implements State {
   }
 
   @Override
-  public Future<Variable> get(final String name) {
-    final long future = __get(name); // Asynchronously start the operation.
+  public Future<Variable> fetch(final String name) {
+    final long future = __fetch(name); // Asynchronously start the operation.
     return new Future<Variable>() {
       @Override
       public boolean cancel(boolean mayInterruptIfRunning) {
         if (mayInterruptIfRunning) {
-          return __get_cancel(future);
+          return __fetch_cancel(future);
         }
         return false; // Should not interrupt and already running (or finished).
       }
 
       @Override
       public boolean isCancelled() {
-        return __get_is_cancelled(future);
+        return __fetch_is_cancelled(future);
       }
 
       @Override
       public boolean isDone() {
-        return __get_is_done(future);
+        return __fetch_is_done(future);
       }
 
       @Override
       public Variable get() throws InterruptedException, ExecutionException {
-        return __get_get(future);
+        return __fetch_get(future);
       }
 
       @Override
       public Variable get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
-        return __get_get_timeout(future, timeout, unit);
+        return __fetch_get_timeout(future, timeout, unit);
       }
 
       @Override
       protected void finalize() {
-        __get_finalize(future);
+        __fetch_finalize(future);
       }
     };
   }
 
   @Override
-  public Future<Variable> set(Variable variable) {
-    final long future = __set(variable); // Asynchronously start the operation.
+  public Future<Variable> store(Variable variable) {
+    final long future = __store(variable); // Asynchronously start the operation.
     return new Future<Variable>() {
       @Override
       public boolean cancel(boolean mayInterruptIfRunning) {
         if (mayInterruptIfRunning) {
-          return __set_cancel(future);
+          return __store_cancel(future);
         }
         return false; // Should not interrupt and already running (or finished).
       }
 
       @Override
       public boolean isCancelled() {
-        return __set_is_cancelled(future);
+        return __store_is_cancelled(future);
       }
 
       @Override
       public boolean isDone() {
-        return __set_is_done(future);
+        return __store_is_done(future);
       }
 
       @Override
       public Variable get() throws InterruptedException, ExecutionException {
-        return __set_get(future);
+        return __store_get(future);
       }
 
       @Override
       public Variable get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException {
-        return __set_get_timeout(future, timeout, unit);
+        return __store_get_timeout(future, timeout, unit);
       }
 
       @Override
       protected void finalize() {
-        __set_finalize(future);
+        __store_finalize(future);
       }
     };
   }
 
   @Override
+  public Future<Boolean> expunge(Variable variable) {
+    final long future = __expunge(variable); // Asynchronously start the operation.
+    return new Future<Boolean>() {
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        if (mayInterruptIfRunning) {
+          return __expunge_cancel(future);
+        }
+        return false; // Should not interrupt and already running (or finished).
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return __expunge_is_cancelled(future);
+      }
+
+      @Override
+      public boolean isDone() {
+        return __expunge_is_done(future);
+      }
+
+      @Override
+      public Boolean get() throws InterruptedException, ExecutionException {
+        return __expunge_get(future);
+      }
+
+      @Override
+      public Boolean get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return __expunge_get_timeout(future, timeout, unit);
+      }
+
+      @Override
+      protected void finalize() {
+        __expunge_finalize(future);
+      }
+    };
+  }
+
   public Future<Iterator<String>> names() {
     final long future = __names(); // Asynchronously start the operation.
     return new Future<Iterator<String>>() {
@@ -206,24 +245,33 @@ public class ZooKeeperState implements State {
 
   protected native void finalize();
 
-  // Native implementations of get, set, and names.
-  private native long __get(String name);
-  private native boolean __get_cancel(long future);
-  private native boolean __get_is_cancelled(long future);
-  private native boolean __get_is_done(long future);
-  private native Variable __get_get(long future);
-  private native Variable __get_get_timeout(
+  // Native implementations of 'fetch', 'store', 'expunge', and 'names'.
+  private native long __fetch(String name);
+  private native boolean __fetch_cancel(long future);
+  private native boolean __fetch_is_cancelled(long future);
+  private native boolean __fetch_is_done(long future);
+  private native Variable __fetch_get(long future);
+  private native Variable __fetch_get_timeout(
+      long future, long timeout, TimeUnit unit);
+  private native void __fetch_finalize(long future);
+
+  private native long __store(Variable variable);
+  private native boolean __store_cancel(long future);
+  private native boolean __store_is_cancelled(long future);
+  private native boolean __store_is_done(long future);
+  private native Variable __store_get(long future);
+  private native Variable __store_get_timeout(
       long future, long timeout, TimeUnit unit);
-  private native void __get_finalize(long future);
-
-  private native long __set(Variable variable);
-  private native boolean __set_cancel(long future);
-  private native boolean __set_is_cancelled(long future);
-  private native boolean __set_is_done(long future);
-  private native Variable __set_get(long future);
-  private native Variable __set_get_timeout(
+  private native void __store_finalize(long future);
+
+  private native long __expunge(Variable variable);
+  private native boolean __expunge_cancel(long future);
+  private native boolean __expunge_is_cancelled(long future);
+  private native boolean __expunge_is_done(long future);
+  private native Boolean __expunge_get(long future);
+  private native Boolean __expunge_get_timeout(
       long future, long timeout, TimeUnit unit);
-  private native void __set_finalize(long future);
+  private native void __expunge_finalize(long future);
 
   private native long __names();
   private native boolean __names_cancel(long future);
@@ -234,5 +282,6 @@ public class ZooKeeperState implements State {
       long future, long timeout, TimeUnit unit);
   private native void __names_finalize(long future);
 
+  private long __storage;
   private long __state;
 };

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/master/registry.hpp
----------------------------------------------------------------------
diff --git a/src/master/registry.hpp b/src/master/registry.hpp
new file mode 100644
index 0000000..d3cbe56
--- /dev/null
+++ b/src/master/registry.hpp
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_REGISTRY_HPP__
+#define __MASTER_REGISTRY_HPP__
+
+#include "master/registry.pb.h"
+
+#endif // __MASTER_REGISTRY_HPP__

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
new file mode 100644
index 0000000..877bfa1
--- /dev/null
+++ b/src/master/registry.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import "mesos.proto";
+
+package mesos.internal.registry;
+
+message Slave {
+  // TODO(benh): Add other information here that is internal to Mesos
+  // and shouldn't be exposed in SlaveInfo.
+  required SlaveInfo info = 2;
+}
+
+
+message Registry {
+  repeated Slave slaves = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/messages/messages.hpp
----------------------------------------------------------------------
diff --git a/src/messages/messages.hpp b/src/messages/messages.hpp
index 285cc65..98038c0 100644
--- a/src/messages/messages.hpp
+++ b/src/messages/messages.hpp
@@ -19,6 +19,46 @@
 #ifndef __MESSAGES_HPP__
 #define __MESSAGES_HPP__
 
+#include <google/protobuf/message.h>
+
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
+#include <string>
+
+#include <stout/try.hpp>
+
 #include "messages/messages.pb.h"
 
+namespace messages {
+
+template <typename T>
+Try<T> deserialize(const std::string& value)
+{
+  T t;
+  (void) static_cast<google::protobuf::Message*>(&t);
+
+  google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
+  if (!t.ParseFromZeroCopyStream(&stream)) {
+    return Try<T>::error(
+        "Failed to deserialize " + t.GetDescriptor()->full_name());
+  }
+  return t;
+}
+
+
+template <typename T>
+Try<std::string> serialize(const T& t)
+{
+  (void) static_cast<const google::protobuf::Message*>(&t);
+
+  std::string value;
+  if (!t.SerializeToString(&value)) {
+    return Try<std::string>::error(
+        "Failed to serialize " + t.GetDescriptor()->full_name());
+  }
+  return value;
+}
+
+} // namespace messages {
+
 #endif // __MESSAGES_HPP__

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 3f02474..2c196ee 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -73,12 +73,6 @@ message StatusUpdateRecord {
 }
 
 
-message Slaves
-{
-  repeated SlaveInfo infos = 1;
-}
-
-
 message SubmitSchedulerRequest
 {
   required string name = 1;

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/leveldb.cpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.cpp b/src/state/leveldb.cpp
index 5b3fe91..69d5714 100644
--- a/src/state/leveldb.cpp
+++ b/src/state/leveldb.cpp
@@ -2,6 +2,8 @@
 
 #include <google/protobuf/message.h>
 
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
 #include <string>
 #include <vector>
 
@@ -20,7 +22,7 @@
 #include "messages/state.hpp"
 
 #include "state/leveldb.hpp"
-#include "state/state.hpp"
+#include "state/storage.hpp"
 
 using namespace process;
 
@@ -31,17 +33,17 @@ namespace mesos {
 namespace internal {
 namespace state {
 
-LevelDBStateProcess::LevelDBStateProcess(const string& _path)
+LevelDBStorageProcess::LevelDBStorageProcess(const string& _path)
   : path(_path), db(NULL) {}
 
 
-LevelDBStateProcess::~LevelDBStateProcess()
+LevelDBStorageProcess::~LevelDBStorageProcess()
 {
-  delete db; // Might be null if open failed in LevelDBStateProcess::initialize.
+  delete db; // NULL if open failed in LevelDBStorageProcess::initialize.
 }
 
 
-void LevelDBStateProcess::initialize()
+void LevelDBStorageProcess::initialize()
 {
   leveldb::Options options;
   options.create_if_missing = true;
@@ -58,7 +60,7 @@ void LevelDBStateProcess::initialize()
 }
 
 
-Future<vector<string> > LevelDBStateProcess::names()
+Future<vector<string> > LevelDBStorageProcess::names()
 {
   if (error.isSome()) {
     return Future<vector<string> >::failed(error.get());
@@ -81,13 +83,13 @@ Future<vector<string> > LevelDBStateProcess::names()
 }
 
 
-Future<Option<Entry> > LevelDBStateProcess::fetch(const string& name)
+Future<Option<Entry> > LevelDBStorageProcess::get(const string& name)
 {
   if (error.isSome()) {
     return Future<Option<Entry> >::failed(error.get());
   }
 
-  Try<Option<Entry> > option = get(name);
+  Try<Option<Entry> > option = read(name);
 
   if (option.isError()) {
     return Future<Option<Entry> >::failed(option.error());
@@ -97,16 +99,16 @@ Future<Option<Entry> > LevelDBStateProcess::fetch(const string& name)
 }
 
 
-Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
+Future<bool> LevelDBStorageProcess::set(const Entry& entry, const UUID& uuid)
 {
   if (error.isSome()) {
     return Future<bool>::failed(error.get());
   }
 
-  // We do a fetch first to make sure the version has not changed. This
+  // We do a read first to make sure the version has not changed. This
   // could be optimized in the future, for now it will probably hit
   // the cache anyway.
-  Try<Option<Entry> > option = get(entry.name());
+  Try<Option<Entry> > option = read(entry.name());
 
   if (option.isError()) {
     return Future<bool>::failed(option.error());
@@ -118,11 +120,11 @@ Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
     }
   }
 
-  // Note that there is no need to do the DB::Get and DB::Put
-  // "atomically" because only one db can be opened at a time, so
-  // there can not be any writes that occur concurrently.
+  // Note that the read (i.e., DB::Get) and the write (i.e., DB::Put)
+  // are inherently "atomic" because only one db can be opened at a
+  // time, so there can not be any writes that occur concurrently.
 
-  Try<bool> result = put(entry);
+  Try<bool> result = write(entry);
 
   if (result.isError()) {
     return Future<bool>::failed(result.error());
@@ -132,7 +134,48 @@ Future<bool> LevelDBStateProcess::swap(const Entry& entry, const UUID& uuid)
 }
 
 
-Try<Option<Entry> > LevelDBStateProcess::get(const string& name)
+Future<bool> LevelDBStorageProcess::expunge(const Entry& entry)
+{
+  if (error.isSome()) {
+    return Future<bool>::failed(error.get());
+  }
+
+  // We do a read first to make sure the version has not changed. This
+  // could be optimized in the future, for now it will probably hit
+  // the cache anyway.
+  Try<Option<Entry> > option = read(entry.name());
+
+  if (option.isError()) {
+    return Future<bool>::failed(option.error());
+  }
+
+  if (option.get().isNone()) {
+    return false;
+  }
+
+  if (UUID::fromBytes(option.get().get().uuid()) !=
+      UUID::fromBytes(entry.uuid())) {
+    return false;
+  }
+
+  // Note that the read (i.e., DB::Get) and DB::Delete are inherently
+  // "atomic" because only one db can be opened at a time, so there
+  // can not be any writes that occur concurrently.
+
+  leveldb::WriteOptions options;
+  options.sync = true;
+
+  leveldb::Status status = db->Delete(options, entry.name());
+
+  if (!status.ok()) {
+    return Future<bool>::failed(status.ToString());
+  }
+
+  return true;
+}
+
+
+Try<Option<Entry> > LevelDBStorageProcess::read(const string& name)
 {
   CHECK(error.isNone());
 
@@ -160,7 +203,7 @@ Try<Option<Entry> > LevelDBStateProcess::get(const string& name)
 }
 
 
-Try<bool> LevelDBStateProcess::put(const Entry& entry)
+Try<bool> LevelDBStorageProcess::write(const Entry& entry)
 {
   CHECK(error.isNone());
 

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/leveldb.hpp
----------------------------------------------------------------------
diff --git a/src/state/leveldb.hpp b/src/state/leveldb.hpp
index 94f306b..14a94cc 100644
--- a/src/state/leveldb.hpp
+++ b/src/state/leveldb.hpp
@@ -14,8 +14,7 @@
 
 #include "messages/state.hpp"
 
-#include "state/serializer.hpp"
-#include "state/state.hpp"
+#include "state/storage.hpp"
 
 // Forward declarations.
 namespace leveldb { class DB; }
@@ -26,46 +25,44 @@ namespace internal {
 namespace state {
 
 // More forward declarations.
-class LevelDBStateProcess;
+class LevelDBStorageProcess;
 
 
-template <typename Serializer = StringSerializer>
-class LevelDBState : public State<Serializer>
+class LevelDBStorage : public Storage
 {
 public:
-  LevelDBState(const std::string& path);
-  virtual ~LevelDBState();
+  LevelDBStorage(const std::string& path);
+  virtual ~LevelDBStorage();
 
-  // State implementation.
+  // Storage implementation.
+  virtual process::Future<Option<Entry> > get(const std::string& name);
+  virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
+  virtual process::Future<bool> expunge(const Entry& entry);
   virtual process::Future<std::vector<std::string> > names();
 
-protected:
-  // More State implementation.
-  virtual process::Future<Option<Entry> > fetch(const std::string& name);
-  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
 private:
-  LevelDBStateProcess* process;
+  LevelDBStorageProcess* process;
 };
 
 
-class LevelDBStateProcess : public process::Process<LevelDBStateProcess>
+class LevelDBStorageProcess : public process::Process<LevelDBStorageProcess>
 {
 public:
-  LevelDBStateProcess(const std::string& path);
-  virtual ~LevelDBStateProcess();
+  LevelDBStorageProcess(const std::string& path);
+  virtual ~LevelDBStorageProcess();
 
   virtual void initialize();
 
-  // State implementation.
+  // Storage implementation.
+  process::Future<Option<Entry> > get(const std::string& name);
+  process::Future<bool> set(const Entry& entry, const UUID& uuid);
+  process::Future<bool> expunge(const Entry& entry);
   process::Future<std::vector<std::string> > names();
-  process::Future<Option<Entry> > fetch(const std::string& name);
-  process::Future<bool> swap(const Entry& entry, const UUID& uuid);
 
 private:
   // Helpers for interacting with leveldb.
-  Try<Option<Entry> > get(const std::string& name);
-  Try<bool> put(const Entry& entry);
+  Try<Option<Entry> > read(const std::string& name);
+  Try<bool> write(const Entry& entry);
 
   const std::string path;
   leveldb::DB* db;
@@ -74,16 +71,14 @@ private:
 };
 
 
-template <typename Serializer>
-LevelDBState<Serializer>::LevelDBState(const std::string& path)
+inline LevelDBStorage::LevelDBStorage(const std::string& path)
 {
-  process = new LevelDBStateProcess(path);
+  process = new LevelDBStorageProcess(path);
   process::spawn(process);
 }
 
 
-template <typename Serializer>
-LevelDBState<Serializer>::~LevelDBState()
+inline LevelDBStorage::~LevelDBStorage()
 {
   process::terminate(process);
   process::wait(process);
@@ -91,27 +86,31 @@ LevelDBState<Serializer>::~LevelDBState()
 }
 
 
-template <typename Serializer>
-process::Future<std::vector<std::string> > LevelDBState<Serializer>::names()
+inline process::Future<Option<Entry> > LevelDBStorage::get(
+    const std::string& name)
 {
-  return process::dispatch(process, &LevelDBStateProcess::names);
+  return process::dispatch(process, &LevelDBStorageProcess::get, name);
 }
 
 
-template <typename Serializer>
-process::Future<Option<Entry> > LevelDBState<Serializer>::fetch(
-    const std::string& name)
+inline process::Future<bool> LevelDBStorage::set(
+    const Entry& entry,
+    const UUID& uuid)
 {
-  return process::dispatch(process, &LevelDBStateProcess::fetch, name);
+  return process::dispatch(process, &LevelDBStorageProcess::set, entry, uuid);
 }
 
 
-template <typename Serializer>
-process::Future<bool> LevelDBState<Serializer>::swap(
-    const Entry& entry,
-    const UUID& uuid)
+inline process::Future<bool> LevelDBStorage::expunge(
+    const Entry& entry)
+{
+  return process::dispatch(process, &LevelDBStorageProcess::expunge, entry);
+}
+
+
+inline process::Future<std::vector<std::string> > LevelDBStorage::names()
 {
-  return process::dispatch(process, &LevelDBStateProcess::swap, entry, uuid);
+  return process::dispatch(process, &LevelDBStorageProcess::names);
 }
 
 } // namespace state {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/protobuf.hpp
----------------------------------------------------------------------
diff --git a/src/state/protobuf.hpp b/src/state/protobuf.hpp
new file mode 100644
index 0000000..75e082b
--- /dev/null
+++ b/src/state/protobuf.hpp
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATE_PROTOBUF_HPP__
+#define __STATE_PROTOBUF_HPP__
+
+#include <string>
+
+#include <process/future.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/messages.hpp"
+#include "messages/state.hpp"
+
+#include "state/state.hpp"
+#include "state/storage.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+namespace protobuf {
+
+template <typename T>
+class Variable
+{
+public:
+  T get() const
+  {
+    return t;
+  }
+
+  Variable mutate(const T& t) const
+  {
+    Variable variable(*this);
+    variable.t = t;
+    return variable;
+  }
+
+private:
+  friend class State; // Creates and manages variables.
+
+  Variable(const state::Variable& _variable, const T& _t)
+    : variable(_variable), t(_t)
+  {}
+
+  state::Variable variable; // Not const to keep Variable assignable.
+  T t;
+};
+
+
+class State : public state::State
+{
+public:
+  State(Storage* storage) : state::State(storage) {}
+  virtual ~State() {}
+
+  // Returns a variable from the state, creating a new one if one
+  // previously did not exist (or an error if one occurs).
+  template <typename T>
+  process::Future<Variable<T> > fetch(const std::string& name);
+
+  // Returns the variable specified if it was successfully stored in
+  // the state, otherwise returns none if the version of the variable
+  // was no longer valid, or an error if one occurs.
+  template <typename T>
+  process::Future<Option<Variable<T> > > store(const Variable<T>& variable);
+
+  // Expunges the variable from the state.
+  template <typename T>
+  process::Future<bool> expunge(const Variable<T>& variable);
+
+private:
+  // Helpers to handle future results from fetch and swap. We make
+  // these static members of State for friend access to Variable's
+  // constructor.
+  template <typename T>
+  static process::Future<Variable<T> > _fetch(
+      const state::Variable& option);
+
+  template <typename T>
+  static process::Future<Option<Variable<T> > > _store(
+      const T& t,
+      const Option<state::Variable>& variable);
+};
+
+
+template <typename T>
+process::Future<Variable<T> > State::fetch(const std::string& name)
+{
+  return state::State::fetch(name)
+    .then(lambda::bind(&State::template _fetch<T>, lambda::_1));
+}
+
+
+template <typename T>
+process::Future<Variable<T> > State::_fetch(
+    const state::Variable& variable)
+{
+  Try<T> t = messages::deserialize<T>(variable.value());
+  if (t.isError()) {
+    return process::Future<Variable<T> >::failed(t.error());
+  }
+
+  return Variable<T>(variable, t.get());
+}
+
+
+template <typename T>
+process::Future<Option<Variable<T> > > State::store(
+    const Variable<T>& variable)
+{
+  Try<std::string> value = messages::serialize(variable.t);
+
+  if (value.isError()) {
+    return process::Future<Option<Variable<T> > >::failed(value.error());
+  }
+
+  return state::State::store(variable.variable.mutate(value.get()))
+    .then(lambda::bind(&State::template _store<T>, variable.t, lambda::_1));
+}
+
+
+template <typename T>
+process::Future<Option<Variable<T> > > State::_store(
+    const T& t,
+    const Option<state::Variable>& variable)
+{
+  if (variable.isSome()) {
+    return Option<Variable<T> >::some(Variable<T>(variable.get(), t));
+  }
+
+  return None();
+}
+
+
+template <typename T>
+process::Future<bool> State::expunge(const Variable<T>& variable)
+{
+  return state::State::expunge(variable.variable);
+}
+
+} // namespace protobuf {
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_PROTOBUF_HPP__

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/serializer.hpp
----------------------------------------------------------------------
diff --git a/src/state/serializer.hpp b/src/state/serializer.hpp
deleted file mode 100644
index bd8c5df..0000000
--- a/src/state/serializer.hpp
+++ /dev/null
@@ -1,67 +0,0 @@
-#ifndef __STATE_SERIALIZER_HPP__
-#define __STATE_SERIALIZER_HPP__
-
-#include <google/protobuf/message.h>
-
-#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
-
-#include <string>
-
-#include <stout/error.hpp>
-#include <stout/try.hpp>
-
-namespace mesos {
-namespace internal {
-namespace state {
-
-struct StringSerializer
-{
-  template <typename T>
-  static Try<std::string> deserialize(const std::string& value)
-  {
-    return value;
-  }
-
-  template <typename T>
-  static Try<std::string> serialize(const std::string& value)
-  {
-    return value;
-  }
-};
-
-
-struct ProtobufSerializer
-{
-  template <typename T>
-  static Try<T> deserialize(const std::string& value)
-  {
-    T t;
-    (void)static_cast<google::protobuf::Message*>(&t);
-
-    google::protobuf::io::ArrayInputStream stream(value.data(), value.size());
-    if (!t.ParseFromZeroCopyStream(&stream)) {
-      return Error(
-          "Failed to deserialize " + t.GetDescriptor()->full_name());
-    }
-    return t;
-  }
-
-  template <typename T>
-  static Try<std::string> serialize(const T& t)
-  {
-    // TODO(benh): Actually store the descriptor so that we can verify
-    // type information (and compatibility) when we deserialize.
-    std::string value;
-    if (!t.SerializeToString(&value)) {
-      return Error(
-          "Failed to serialize " + t.GetDescriptor()->full_name());
-    }
-    return value;
-  }
-};
-
-} // namespace state {
-} // namespace internal {
-} // namespace mesos {
-
-#endif // __STATE_SERIALIZER_HPP__

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/state.hpp
----------------------------------------------------------------------
diff --git a/src/state/state.hpp b/src/state/state.hpp
index c616e00..b3abf89 100644
--- a/src/state/state.hpp
+++ b/src/state/state.hpp
@@ -24,16 +24,15 @@
 
 #include <process/future.hpp>
 
+#include <stout/lambda.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 #include <stout/uuid.hpp>
 
-#include "logging/logging.hpp"
-
 #include "messages/state.hpp"
 
-#include "state/serializer.hpp"
+#include "state/storage.hpp"
 
 namespace mesos {
 namespace internal {
@@ -46,187 +45,153 @@ namespace state {
 // fetched. Varying implementations of state provide varying
 // replicated guarantees.
 //
-// Note that the semantics of 'get' and 'set' provide atomicity. That
-// is, you can not set a variable that has changed since you did the
-// last get. That is, if a set succeeds then no other writes have been
-// performed on the variable since your get.
-
+// Note that the semantics of 'fetch' and 'store' provide
+// atomicity. That is, you can not store a variable that has changed
+// since you did the last fetch. That is, if a store succeeds then no
+// other writes have been performed on the variable since your fetch.
+//
 // Example:
-
-//   State<ProtobufSerializer>* state = new ZooKeeperState<ProtobufSerializer>();
-//   Future<Variable<Slaves> > variable = state->get<Slaves>("slaves");
-//   Variable<Slaves> slaves = variable.get();
-//   slaves->add_infos()->MergeFrom(info);
-//   Future<bool> set = state->set(&slaves);
+//
+//   Storage* storage = new ZooKeeperStorage();
+//   State* state = new State(storage);
+//   Future<Variable> variable = state->fetch("slaves");
+//   std::string value = update(variable.value());
+//   variable = variable.mutate(value);
+//   state->store(variable);
 
 // Forward declarations.
-template <typename Serializer>
 class State;
-class ZooKeeperStateProcess;
 
 
-template <typename T>
+// Wrapper around a state "entry" to force immutability.
 class Variable
 {
 public:
-  T* operator -> ()
+  std::string value() const
   {
-    return &t;
+    return entry.value();
+  }
+
+  Variable mutate(const std::string& value) const
+  {
+    Variable variable(*this);
+    variable.entry.set_value(value);
+    return variable;
   }
 
 private:
-  template <typename Serializer>
   friend class State; // Creates and manages variables.
 
-  Variable(const Entry& _entry, const T& _t)
-    : entry(_entry), t(_t)
+  Variable(const Entry& _entry)
+    : entry(_entry)
   {}
 
-  Entry entry; // Not const so Variable is copyable.
-  T t;
+  Entry entry; // Not const to keep Variable assignable.
 };
 
 
-template <typename Serializer = StringSerializer>
 class State
 {
 public:
-  State() {}
+  State(Storage* _storage) : storage(_storage) {}
   virtual ~State() {}
 
   // Returns a variable from the state, creating a new one if one
   // previously did not exist (or an error if one occurs).
-  template <typename T>
-  process::Future<Variable<T> > get(const std::string& name);
+  process::Future<Variable> fetch(const std::string& name);
 
-  // Returns true if the variable was successfully set in the state,
-  // otherwise false if the version of the variable was no longer
-  // valid (or an error if one occurs).
-  template <typename T>
-  process::Future<Option<Variable<T> > > set(const Variable<T>& variable);
+  // Returns the variable specified if it was successfully stored in
+  // the state, otherwise returns none if the version of the variable
+  // was no longer valid, or an error if one occurs.
+  process::Future<Option<Variable> > store(const Variable& variable);
 
-  // Returns the collection of variable names in the state.
-  virtual process::Future<std::vector<std::string> > names() = 0;
+  // Returns true if successfully expunged the variable from the state.
+  process::Future<bool> expunge(const Variable& variable);
 
-protected:
-  // Fetch and swap state entries, factored out to allow State
-  // implementations to be agnostic of Variable which is templated.
-  virtual process::Future<Option<Entry> > fetch(const std::string& name) = 0;
-  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid) = 0;
+  // Returns the collection of variable names in the state.
+  process::Future<std::vector<std::string> > names();
 
 private:
   // Helpers to handle future results from fetch and swap. We make
   // these static members of State for friend access to Variable's
   // constructor.
-  template <typename T>
-  static process::Future<Variable<T> > _get(
+  static process::Future<Variable> _fetch(
       const std::string& name,
       const Option<Entry>& option);
 
-  template <typename T>
-  static process::Future<Option<Variable<T> > > _set(
+  static process::Future<Option<Variable> > _store(
       const Entry& entry,
-      const T& t,
       const bool& b); // TODO(benh): Remove 'const &' after fixing libprocess.
+
+  Storage* storage;
 };
 
 
-template <typename Serializer>
-template <typename T>
-process::Future<Variable<T> > State<Serializer>::_get(
+inline process::Future<Variable> State::fetch(const std::string& name)
+{
+  return storage->get(name)
+    .then(lambda::bind(&State::_fetch, name, lambda::_1));
+}
+
+
+inline process::Future<Variable> State::_fetch(
     const std::string& name,
     const Option<Entry>& option)
 {
   if (option.isSome()) {
-    const Entry& entry = option.get();
-
-    Try<T> t = Serializer::template deserialize<T>(entry.value());
-
-    if (t.isError()) {
-      return process::Future<Variable<T> >::failed(t.error());
-    }
-
-    return Variable<T>(entry, t.get());
+    return Variable(option.get());
   }
 
-  // Otherwise, construct a Variable out of a new Entry with a default
-  // value for T (and a random UUID to start).
-  T t;
+  // Otherwise, construct a Variable with a new Entry (with a random
+  // UUID and no value to start).
+  Entry entry;
+  entry.set_name(name);
+  entry.set_uuid(UUID::random().toBytes());
+
+  return Variable(entry);
+}
 
-  Try<std::string> value = Serializer::template serialize<T>(t);
 
-  if (value.isError()) {
-    return process::Future<Variable<T> >::failed(value.error());
-  }
+inline process::Future<Option<Variable> > State::store(const Variable& variable)
+{
+  // Note that we try and swap an entry even if the value didn't change!
+  UUID uuid = UUID::fromBytes(variable.entry.uuid());
 
+  // Create a new entry to replace the existing entry provided the
+  // UUID matches.
   Entry entry;
-  entry.set_name(name);
+  entry.set_name(variable.entry.name());
   entry.set_uuid(UUID::random().toBytes());
-  entry.set_value(value.get());
+  entry.set_value(variable.entry.value());
 
-  return Variable<T>(entry, t);
+  return storage->set(entry, uuid)
+    .then(lambda::bind(&State::_store, entry, lambda::_1));
 }
 
 
-template <typename Serializer>
-template <typename T>
-process::Future<Option<Variable<T> > > State<Serializer>::_set(
+inline process::Future<Option<Variable > > State::_store(
     const Entry& entry,
-    const T& t,
     const bool& b) // TODO(benh): Remove 'const &' after fixing libprocess.
 {
   if (b) {
-    return Option<Variable<T> >::some(Variable<T>(entry, t));
+    return Option<Variable>::some(Variable(entry));
   }
 
   return None();
 }
 
 
-template <typename Serializer>
-template <typename T>
-process::Future<Variable<T> > State<Serializer>::get(const std::string& name)
+inline process::Future<bool> State::expunge(const Variable& variable)
 {
-  return fetch(name)
-    .then(std::tr1::bind(&State<Serializer>::template _get<T>,
-                         name,
-                         std::tr1::placeholders::_1));
+  return storage->expunge(variable.entry);
 }
 
 
-template <typename Serializer>
-template <typename T>
-process::Future<Option<Variable<T> > > State<Serializer>::set(
-      const Variable<T>& variable)
+inline process::Future<std::vector<std::string> > State::names()
 {
-  Try<std::string> value = Serializer::template serialize<T>(variable.t);
-
-  if (value.isError()) {
-    return process::Future<Option<Variable<T> > >::failed(value.error());
-  }
-
-  // Note that we try and swap an entry even if the value didn't change!
-  UUID uuid = UUID::fromBytes(variable.entry.uuid());
-
-  // Create a new entry that should be replace the existing entry
-  // provided the UUID matches.
-  Entry entry;
-  entry.set_name(variable.entry.name());
-  entry.set_uuid(UUID::random().toBytes());
-  entry.set_value(value.get());
-
-  std::tr1::function<
-  process::Future<Option<Variable<T> > >(const bool&)> _set =
-    std::tr1::bind(&State<Serializer>::template _set<T>,
-                   entry,
-                   variable.t,
-                   std::tr1::placeholders::_1);
-
-  return swap(entry, uuid).then(_set);
+  return storage->names();
 }
 
-
-
 } // namespace state {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/storage.hpp
----------------------------------------------------------------------
diff --git a/src/state/storage.hpp b/src/state/storage.hpp
new file mode 100644
index 0000000..a137075
--- /dev/null
+++ b/src/state/storage.hpp
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __STATE_STORAGE_HPP__
+#define __STATE_STORAGE_HPP__
+
+#include <string>
+#include <vector>
+
+#include <process/future.hpp>
+
+#include <stout/option.hpp>
+#include <stout/uuid.hpp>
+
+#include "messages/state.hpp"
+
+namespace mesos {
+namespace internal {
+namespace state {
+
+class Storage
+{
+public:
+  Storage() {}
+  virtual ~Storage() {}
+
+  // Get and set state entries, factored out to allow Storage
+  // implementations to be agnostic of Variable. Note that set acts
+  // like a "test-and-set" by requiring the existing entry to have the
+  // specified UUID.
+  virtual process::Future<Option<Entry> > get(const std::string& name) = 0;
+  virtual process::Future<bool> set(const Entry& entry, const UUID& uuid) = 0;
+
+  // Returns true if successfully expunged the variable from the state.
+  virtual process::Future<bool> expunge(const Entry& entry) = 0;
+
+  // Returns the collection of variable names in the state.
+  virtual process::Future<std::vector<std::string> > names() = 0;
+};
+
+} // namespace state {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __STATE_STORAGE_HPP__

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/zookeeper.cpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.cpp b/src/state/zookeeper.cpp
index 07339ff..1801fce 100644
--- a/src/state/zookeeper.cpp
+++ b/src/state/zookeeper.cpp
@@ -1,5 +1,7 @@
 #include <google/protobuf/message.h>
 
+#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
+
 #include <queue>
 #include <string>
 #include <vector>
@@ -21,7 +23,7 @@
 
 #include "messages/state.hpp"
 
-#include "state/state.hpp"
+#include "state/storage.hpp"
 #include "state/zookeeper.hpp"
 
 #include "zookeeper/authentication.hpp"
@@ -53,7 +55,7 @@ void fail(queue<T*>* queue, const string& message)
 }
 
 
-ZooKeeperStateProcess::ZooKeeperStateProcess(
+ZooKeeperStorageProcess::ZooKeeperStorageProcess(
     const string& _servers,
     const Duration& _timeout,
     const string& _znode,
@@ -71,27 +73,27 @@ ZooKeeperStateProcess::ZooKeeperStateProcess(
 {}
 
 
-ZooKeeperStateProcess::~ZooKeeperStateProcess()
+ZooKeeperStorageProcess::~ZooKeeperStorageProcess()
 {
-  fail(&pending.names, "No longer managing state");
-  fail(&pending.fetches, "No longer managing state");
-  fail(&pending.swaps, "No longer managing state");
+  fail(&pending.names, "No longer managing storage");
+  fail(&pending.gets, "No longer managing storage");
+  fail(&pending.sets, "No longer managing storage");
 
   delete zk;
   delete watcher;
 }
 
 
-void ZooKeeperStateProcess::initialize()
+void ZooKeeperStorageProcess::initialize()
 {
   // Doing initialization here allows to avoid the race between
   // instantiating the ZooKeeper instance and being spawned ourself.
-  watcher = new ProcessWatcher<ZooKeeperStateProcess>(self());
+  watcher = new ProcessWatcher<ZooKeeperStorageProcess>(self());
   zk = new ZooKeeper(servers, timeout, watcher);
 }
 
 
-Future<vector<string> > ZooKeeperStateProcess::names()
+Future<vector<string> > ZooKeeperStorageProcess::names()
 {
   if (error.isSome()) {
     return Future<vector<string> >::failed(error.get());
@@ -115,22 +117,22 @@ Future<vector<string> > ZooKeeperStateProcess::names()
 }
 
 
-Future<Option<Entry> > ZooKeeperStateProcess::fetch(const string& name)
+Future<Option<Entry> > ZooKeeperStorageProcess::get(const string& name)
 {
   if (error.isSome()) {
     return Future<Option<Entry> >::failed(error.get());
   } else if (state != CONNECTED) {
-    Fetch* fetch = new Fetch(name);
-    pending.fetches.push(fetch);
-    return fetch->promise.future();
+    Get* get = new Get(name);
+    pending.gets.push(get);
+    return get->promise.future();
   }
 
-  Result<Option<Entry> > result = doFetch(name);
+  Result<Option<Entry> > result = doGet(name);
 
   if (result.isNone()) { // Try again later.
-    Fetch* fetch = new Fetch(name);
-    pending.fetches.push(fetch);
-    return fetch->promise.future();
+    Get* get = new Get(name);
+    pending.gets.push(get);
+    return get->promise.future();
   } else if (result.isError()) {
     return Future<Option<Entry> >::failed(result.error());
   }
@@ -139,22 +141,46 @@ Future<Option<Entry> > ZooKeeperStateProcess::fetch(const string& name)
 }
 
 
-Future<bool> ZooKeeperStateProcess::swap(const Entry& entry, const UUID& uuid)
+Future<bool> ZooKeeperStorageProcess::set(const Entry& entry, const UUID& uuid)
+{
+  if (error.isSome()) {
+    return Future<bool>::failed(error.get());
+  } else if (state != CONNECTED) {
+    Set* set = new Set(entry, uuid);
+    pending.sets.push(set);
+    return set->promise.future();
+  }
+
+  Result<bool> result = doSet(entry, uuid);
+
+  if (result.isNone()) { // Try again later.
+    Set* set = new Set(entry, uuid);
+    pending.sets.push(set);
+    return set->promise.future();
+  } else if (result.isError()) {
+    return Future<bool>::failed(result.error());
+  }
+
+  return result.get();
+}
+
+
+Future<bool> ZooKeeperStorageProcess::expunge(const Entry& entry)
 {
   if (error.isSome()) {
     return Future<bool>::failed(error.get());
   } else if (state != CONNECTED) {
-    Swap* swap = new Swap(entry, uuid);
-    pending.swaps.push(swap);
-    return swap->promise.future();
+    Expunge* expunge = new Expunge(entry);
+    pending.expunges.push(expunge);
+    return expunge->promise.future();
   }
 
-  Result<bool> result = doSwap(entry, uuid);
+  Result<bool> result = doExpunge(entry);
 
   if (result.isNone()) { // Try again later.
-    Swap* swap = new Swap(entry, uuid);
-    pending.swaps.push(swap);
-    return swap->promise.future();
+    Expunge* expunge = new Expunge(entry);
+    pending.expunges.push(expunge);
+    return expunge->promise.future();
   } else if (result.isError()) {
     return Future<bool>::failed(result.error());
   }
@@ -163,7 +189,7 @@ Future<bool> ZooKeeperStateProcess::swap(const Entry& entry, const UUID& uuid)
 }
 
 
-void ZooKeeperStateProcess::connected(bool reconnect)
+void ZooKeeperStorageProcess::connected(bool reconnect)
 {
   if (!reconnect) {
     // Authenticate if necessary (and we are connected for the first
@@ -196,43 +222,43 @@ void ZooKeeperStateProcess::connected(bool reconnect)
     delete names;
   }
 
-  while (!pending.fetches.empty()) {
-    Fetch* fetch = pending.fetches.front();
-    Result<Option<Entry> > result = doFetch(fetch->name);
+  while (!pending.gets.empty()) {
+    Get* get = pending.gets.front();
+    Result<Option<Entry> > result = doGet(get->name);
     if (result.isNone()) {
       return; // Try again later.
     } else if (result.isError()) {
-      fetch->promise.fail(result.error());
+      get->promise.fail(result.error());
     } else {
-      fetch->promise.set(result.get());
+      get->promise.set(result.get());
     }
-    pending.fetches.pop();
-    delete fetch;
+    pending.gets.pop();
+    delete get;
   }
 
-  while (!pending.swaps.empty()) {
-    Swap* swap = pending.swaps.front();
-    Result<bool> result = doSwap(swap->entry, swap->uuid);
+  while (!pending.sets.empty()) {
+    Set* set = pending.sets.front();
+    Result<bool> result = doSet(set->entry, set->uuid);
     if (result.isNone()) {
       return; // Try again later.
     } else if (result.isError()) {
-      swap->promise.fail(result.error());
+      set->promise.fail(result.error());
     } else {
-      swap->promise.set(result.get());
+      set->promise.set(result.get());
     }
-    pending.swaps.pop();
-    delete swap;
+    pending.sets.pop();
+    delete set;
   }
 }
 
 
-void ZooKeeperStateProcess::reconnecting()
+void ZooKeeperStorageProcess::reconnecting()
 {
   state = CONNECTING;
 }
 
 
-void ZooKeeperStateProcess::expired()
+void ZooKeeperStorageProcess::expired()
 {
   state = DISCONNECTED;
 
@@ -243,25 +269,25 @@ void ZooKeeperStateProcess::expired()
 }
 
 
-void ZooKeeperStateProcess::updated(const string& path)
+void ZooKeeperStorageProcess::updated(const string& path)
 {
   LOG(FATAL) << "Unexpected ZooKeeper event";
 }
 
 
-void ZooKeeperStateProcess::created(const string& path)
+void ZooKeeperStorageProcess::created(const string& path)
 {
   LOG(FATAL) << "Unexpected ZooKeeper event";
 }
 
 
-void ZooKeeperStateProcess::deleted(const string& path)
+void ZooKeeperStorageProcess::deleted(const string& path)
 {
   LOG(FATAL) << "Unexpected ZooKeeper event";
 }
 
 
-Result<vector<string> > ZooKeeperStateProcess::doNames()
+Result<vector<string> > ZooKeeperStorageProcess::doNames()
 {
   // Get all children to determine current memberships.
   vector<string> results;
@@ -284,7 +310,7 @@ Result<vector<string> > ZooKeeperStateProcess::doNames()
 }
 
 
-Result<Option<Entry> > ZooKeeperStateProcess::doFetch(const string& name)
+Result<Option<Entry> > ZooKeeperStorageProcess::doGet(const string& name)
 {
   CHECK(error.isNone()) << ": " << error.get();
   CHECK(state == CONNECTED);
@@ -317,7 +343,7 @@ Result<Option<Entry> > ZooKeeperStateProcess::doFetch(const string& name)
 }
 
 
-Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
+Result<bool> ZooKeeperStorageProcess::doSet(const Entry& entry, const UUID& uuid)
 {
   CHECK(error.isNone()) << ": " << error.get();
   CHECK(state == CONNECTED);
@@ -397,7 +423,7 @@ Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
     return false;
   }
 
-  // Okay, do a set, we get atomic swap by requiring 'stat.version'.
+  // Okay, do the set, we get atomicity by requiring 'stat.version'.
   code = zk->set(znode + "/" + entry.name(), data, stat.version);
 
   if (code == ZBADVERSION) {
@@ -414,6 +440,57 @@ Result<bool> ZooKeeperStateProcess::doSwap(const Entry& entry, const UUID& uuid)
   return true;
 }
 
+
+Result<bool> ZooKeeperStorageProcess::doExpunge(const Entry& entry)
+{
+  CHECK(error.isNone()) << ": " << error.get();
+  CHECK(state == CONNECTED);
+
+  string result;
+  Stat stat;
+
+  int code = zk->get(znode + "/" + entry.name(), false, &result, &stat);
+
+  if (code == ZNONODE) {
+    return false;
+  } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    return None(); // Try again later.
+  } else if (code != ZOK) {
+    return Error(
+        "Failed to get '" + znode + "/" + entry.name() +
+        "' in ZooKeeper: " + zk->message(code));
+  }
+
+  google::protobuf::io::ArrayInputStream stream(result.data(), result.size());
+
+  Entry current;
+
+  if (!current.ParseFromZeroCopyStream(&stream)) {
+    return Error("Failed to deserialize Entry");
+  }
+
+  if (UUID::fromBytes(current.uuid()) != UUID::fromBytes(entry.uuid())) {
+    return false;
+  }
+
+  // Okay, do the remove, we get atomicity by requiring 'stat.version'.
+  code = zk->remove(znode + "/" + entry.name(), stat.version);
+
+  if (code == ZBADVERSION) {
+    return false;
+  } else if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    return None(); // Try again later.
+  } else if (code != ZOK) {
+    return Error(
+        "Failed to remove '" + znode + "/" + entry.name() +
+        "' in ZooKeeper: " + zk->message(code));
+  }
+
+  return true;
+}
+
 } // namespace state {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/33f4ff4b/src/state/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/state/zookeeper.hpp b/src/state/zookeeper.hpp
index 1e332ed..90b6607 100644
--- a/src/state/zookeeper.hpp
+++ b/src/state/zookeeper.hpp
@@ -17,8 +17,7 @@
 
 #include "messages/state.hpp"
 
-#include "state/serializer.hpp"
-#include "state/state.hpp"
+#include "state/storage.hpp"
 
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/watcher.hpp"
@@ -29,51 +28,49 @@ namespace internal {
 namespace state {
 
 // Forward declarations.
-class ZooKeeperStateProcess;
+class ZooKeeperStorageProcess;
 
 
-template <typename Serializer = StringSerializer>
-class ZooKeeperState : public State<Serializer>
+class ZooKeeperStorage : public Storage
 {
 public:
   // TODO(benh): Just take a zookeeper::URL.
-  ZooKeeperState(
+  ZooKeeperStorage(
       const std::string& servers,
       const Duration& timeout,
       const std::string& znode,
       const Option<zookeeper::Authentication>& auth =
       Option<zookeeper::Authentication>());
-  virtual ~ZooKeeperState();
+  virtual ~ZooKeeperStorage();
 
-  // State implementation.
+  // Storage implementation.
+  virtual process::Future<Option<Entry> > get(const std::string& name);
+  virtual process::Future<bool> set(const Entry& entry, const UUID& uuid);
+  virtual process::Future<bool> expunge(const Entry& entry);
   virtual process::Future<std::vector<std::string> > names();
 
-protected:
-  // More State implementation.
-  virtual process::Future<Option<Entry> > fetch(const std::string& name);
-  virtual process::Future<bool> swap(const Entry& entry, const UUID& uuid);
-
 private:
-  ZooKeeperStateProcess* process;
+  ZooKeeperStorageProcess* process;
 };
 
 
-class ZooKeeperStateProcess : public process::Process<ZooKeeperStateProcess>
+class ZooKeeperStorageProcess : public process::Process<ZooKeeperStorageProcess>
 {
 public:
-  ZooKeeperStateProcess(
+  ZooKeeperStorageProcess(
       const std::string& servers,
       const Duration& timeout,
       const std::string& znode,
       const Option<zookeeper::Authentication>& auth);
-  virtual ~ZooKeeperStateProcess();
+  virtual ~ZooKeeperStorageProcess();
 
   virtual void initialize();
 
-  // State implementation.
+  // Storage implementation.
+  process::Future<Option<Entry> > get(const std::string& name);
+  process::Future<bool> set(const Entry& entry, const UUID& uuid);
+  virtual process::Future<bool> expunge(const Entry& entry);
   process::Future<std::vector<std::string> > names();
-  process::Future<Option<Entry> > fetch(const std::string& name);
-  process::Future<bool> swap(const Entry& entry, const UUID& uuid);
 
   // ZooKeeper events.
   void connected(bool reconnect);
@@ -86,8 +83,9 @@ public:
 private:
   // Helpers for getting the names, fetching, and swapping.
   Result<std::vector<std::string> > doNames();
-  Result<Option<Entry> > doFetch(const std::string& name);
-  Result<bool> doSwap(const Entry& entry, const UUID& uuid);
+  Result<Option<Entry> > doGet(const std::string& name);
+  Result<bool> doSet(const Entry& entry, const UUID& uuid);
+  Result<bool> doExpunge(const Entry& entry);
 
   const std::string servers;
   const Duration timeout;
@@ -111,49 +109,56 @@ private:
     process::Promise<std::vector<std::string> > promise;
   };
 
-  struct Fetch
+  struct Get
   {
-    Fetch(const std::string& _name)
+    Get(const std::string& _name)
       : name(_name) {}
     std::string name;
     process::Promise<Option<Entry> > promise;
   };
 
-  struct Swap
+  struct Set
   {
-    Swap(const Entry& _entry, const UUID& _uuid)
+    Set(const Entry& _entry, const UUID& _uuid)
       : entry(_entry), uuid(_uuid) {}
     Entry entry;
     UUID uuid;
     process::Promise<bool> promise;
   };
 
+  struct Expunge
+  {
+    Expunge(const Entry& _entry)
+      : entry(_entry) {}
+    Entry entry;
+    process::Promise<bool> promise;
+  };
+
   // TODO(benh): Make pending a single queue of "operations" that can
   // be "invoked" (C++11 lambdas would help).
   struct {
     std::queue<Names*> names;
-    std::queue<Fetch*> fetches;
-    std::queue<Swap*> swaps;
+    std::queue<Get*> gets;
+    std::queue<Set*> sets;
+    std::queue<Expunge*> expunges;
   } pending;
 
   Option<std::string> error;
 };
 
 
-template <typename Serializer>
-ZooKeeperState<Serializer>::ZooKeeperState(
+inline ZooKeeperStorage::ZooKeeperStorage(
     const std::string& servers,
     const Duration& timeout,
     const std::string& znode,
     const Option<zookeeper::Authentication>& auth)
 {
-  process = new ZooKeeperStateProcess(servers, timeout, znode, auth);
+  process = new ZooKeeperStorageProcess(servers, timeout, znode, auth);
   process::spawn(process);
 }
 
 
-template <typename Serializer>
-ZooKeeperState<Serializer>::~ZooKeeperState()
+inline ZooKeeperStorage::~ZooKeeperStorage()
 {
   process::terminate(process);
   process::wait(process);
@@ -161,27 +166,31 @@ ZooKeeperState<Serializer>::~ZooKeeperState()
 }
 
 
-template <typename Serializer>
-process::Future<std::vector<std::string> > ZooKeeperState<Serializer>::names()
+inline process::Future<Option<Entry> > ZooKeeperStorage::get(
+    const std::string& name)
 {
-  return process::dispatch(process, &ZooKeeperStateProcess::names);
+  return process::dispatch(process, &ZooKeeperStorageProcess::get, name);
 }
 
 
-template <typename Serializer>
-process::Future<Option<Entry> > ZooKeeperState<Serializer>::fetch(
-    const std::string& name)
+inline process::Future<bool> ZooKeeperStorage::set(
+    const Entry& entry,
+    const UUID& uuid)
 {
-  return process::dispatch(process, &ZooKeeperStateProcess::fetch, name);
+  return process::dispatch(process, &ZooKeeperStorageProcess::set, entry, uuid);
 }
 
 
-template <typename Serializer>
-process::Future<bool> ZooKeeperState<Serializer>::swap(
-    const Entry& entry,
-    const UUID& uuid)
+inline process::Future<bool> ZooKeeperStorage::expunge(
+    const Entry& entry)
+{
+  return process::dispatch(process, &ZooKeeperStorageProcess::expunge, entry);
+}
+
+
+inline process::Future<std::vector<std::string> > ZooKeeperStorage::names()
 {
-  return process::dispatch(process, &ZooKeeperStateProcess::swap, entry, uuid);
+  return process::dispatch(process, &ZooKeeperStorageProcess::names);
 }
 
 } // namespace state {