You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/03 01:23:43 UTC

svn commit: r1368755 - in /incubator/mesos/trunk/src: ./ java/jni/ java/src/org/apache/mesos/state/ state/

Author: benh
Date: Thu Aug  2 23:23:42 2012
New Revision: 1368755

URL: http://svn.apache.org/viewvc?rev=1368755&view=rev
Log:
Refactored Java implementation of State to use immutable variables and
added an in-memory implementation for testing
(https://reviews.apache.org/r/6079).

Added:
    incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java
Modified:
    incubator/mesos/trunk/src/Makefile.am
    incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp
    incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
    incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java
    incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java
    incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java
    incubator/mesos/trunk/src/state/state.hpp

Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Thu Aug  2 23:23:42 2012
@@ -475,6 +475,7 @@ MESOS_JAR_SOURCE =							\
 	$(srcdir)/java/src/org/apache/mesos/MesosSchedulerDriver.java	\
 	$(srcdir)/java/src/org/apache/mesos/SchedulerDriver.java	\
 	$(srcdir)/java/src/org/apache/mesos/Scheduler.java		\
+	$(srcdir)/java/src/org/apache/mesos/state/InMemoryState.java	\
 	$(srcdir)/java/src/org/apache/mesos/state/State.java		\
 	$(srcdir)/java/src/org/apache/mesos/state/Variable.java		\
 	$(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java

Modified: incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp (original)
+++ incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp Thu Aug  2 23:23:42 2012
@@ -35,22 +35,36 @@ JNIEXPORT jbyteArray JNICALL Java_org_ap
 /*
  * Class:     org_apache_mesos_state_Variable
  * Method:    mutate
- * Signature: ([B)V
+ * Signature: ([B)Lorg/apache/mesos/state/Variable;
  */
-JNIEXPORT void JNICALL Java_org_apache_mesos_state_Variable_mutate
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_Variable_mutate
   (JNIEnv* env, jobject thiz, jbyteArray jvalue)
 {
   jclass clazz = env->GetObjectClass(thiz);
 
   jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
 
-  Variable<std::string>* variable =
-    (Variable<std::string>*) env->GetLongField(thiz, __variable);
+  // Create a copy of the old variable to support the immutable Java API.
+  Variable<std::string>* variable = new Variable<std::string>(
+      *((Variable<std::string>*) env->GetLongField(thiz, __variable)));
 
   jbyte* value = env->GetByteArrayElements(jvalue, NULL);
   jsize length = env->GetArrayLength(jvalue);
 
+  // Update the value of the new copy.
   (*variable)->assign((const char*) value, length);
+
+  env->ReleaseByteArrayElements(jvalue, value, 0);
+
+  // Variable variable = new Variable();
+  clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+  jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+  jobject jvariable = env->NewObject(clazz, _init_);
+
+  env->SetLongField(jvariable, __variable, (jlong) variable);
+
+  return jvariable;
 }
 
 

Modified: incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp (original)
+++ incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp Thu Aug  2 23:23:42 2012
@@ -11,6 +11,10 @@
 
 using namespace mesos::internal::state;
 
+using process::Future;
+
+using std::string;
+
 extern "C" {
 
 /*
@@ -26,7 +30,7 @@ JNIEXPORT void JNICALL Java_org_apache_m
    jobject junit,
    jstring jznode)
 {
-  std::string servers = construct<std::string>(env, jservers);
+  string servers = construct<string>(env, jservers);
 
   jclass clazz = env->GetObjectClass(junit);
 
@@ -37,7 +41,7 @@ JNIEXPORT void JNICALL Java_org_apache_m
 
   seconds timeout(jseconds);
 
-  std::string znode = construct<std::string>(env, jznode);
+  string znode = construct<string>(env, jznode);
 
    // Create the C++ State and initialize the __state variable.
   State<>* state = new ZooKeeperState<>(servers, timeout, znode);
@@ -64,7 +68,7 @@ JNIEXPORT void JNICALL Java_org_apache_m
    jstring jscheme,
    jbyteArray jcredentials)
 {
-  std::string servers = construct<std::string>(env, jservers);
+  string servers = construct<string>(env, jservers);
 
   jclass clazz = env->GetObjectClass(junit);
 
@@ -75,17 +79,17 @@ JNIEXPORT void JNICALL Java_org_apache_m
 
   seconds timeout(jseconds);
 
-  std::string znode = construct<std::string>(env, jznode);
+  string znode = construct<string>(env, jznode);
 
   // Create the C++ State.
   State<>* state = NULL;
   if (jscheme != NULL && jcredentials != NULL) {
-    std::string scheme = construct<std::string>(env, jscheme);
+    string scheme = construct<string>(env, jscheme);
 
     jbyte* temp = env->GetByteArrayElements(jcredentials, NULL);
     jsize length = env->GetArrayLength(jcredentials);
 
-    std::string credentials((char*) temp, (size_t) length);
+    string credentials((char*) temp, (size_t) length);
 
     env->ReleaseByteArrayElements(jcredentials, temp, 0);
 
@@ -127,12 +131,12 @@ JNIEXPORT void JNICALL Java_org_apache_m
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
  * Method:    __get
- * Signature: (Ljava/lang/String;)Lorg/apache/mesos/state/Variable;
+ * Signature: (Ljava/lang/String;)J
  */
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
   (JNIEnv* env, jobject thiz, jstring jname)
 {
-  std::string name = construct<std::string>(env, jname);
+  string name = construct<string>(env, jname);
 
   jclass clazz = env->GetObjectClass(thiz);
 
@@ -140,25 +144,88 @@ JNIEXPORT jobject JNICALL Java_org_apach
 
   State<>* state = (State<>*) env->GetLongField(thiz, __state);
 
-  process::Future<Variable<std::string> > future =
-    state->get<std::string>(name);
+  Future<Variable<string> >* future =
+    new Future<Variable<string> >(state->get<string>(name));
+
+  return (jlong) future;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __get_cancel
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1cancel
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
 
-  future.await();
+  if (!future->isDiscarded()) {
+    future->discard();
+    return (jboolean) future->isDiscarded();
+  }
+
+  return (jboolean) true;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __get_is_cancelled
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1cancelled
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
 
-  if (future.isFailed()) {
-    clazz = env->FindClass("java/util/concurrent/ExecutionException");
-    env->ThrowNew(clazz, future.failure().c_str());
-  } else if (future.isDiscarded()) {
-    clazz = env->FindClass("java/util/concurrent/CancellationException");
+  return (jboolean) future->isDiscarded();
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __get_is_done
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1done
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+  return (jboolean) !future->isPending();
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __get_await
+ * Signature: (J)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1await
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+  future->await();
+
+  if (future->isFailed()) {
+    jclass clazz = env->FindClass("java/util/concurrent/ExecutionException");
+    env->ThrowNew(clazz, future->failure().c_str());
+    return NULL;
+  } else if (future->isDiscarded()) {
+    jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
     env->ThrowNew(clazz, "Future was discarded");
+    return NULL;
   }
 
-  CHECK(future.isReady());
+  CHECK(future->isReady());
 
-  Variable<std::string>* variable = new Variable<std::string>(future.get());
+  Variable<string>* variable = new Variable<string>(future->get());
 
   // Variable variable = new Variable();
-   clazz = env->FindClass("org/apache/mesos/state/Variable");
+  jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
 
   jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
   jobject jvariable = env->NewObject(clazz, _init_);
@@ -172,18 +239,85 @@ JNIEXPORT jobject JNICALL Java_org_apach
 
 /*
  * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __get_await_timeout
+ * Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1await_1timeout
+  (JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
+{
+  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+  jclass clazz = env->GetObjectClass(junit);
+
+  // long seconds = unit.toSeconds(time);
+  jmethodID toSeconds = env->GetMethodID(clazz, "toSeconds", "(J)J");
+
+  jlong jseconds = env->CallLongMethod(junit, toSeconds, jtimeout);
+
+  seconds timeout(jseconds);
+
+  if (future->await(timeout.value)) {
+    if (future->isFailed()) {
+      clazz = env->FindClass("java/util/concurrent/ExecutionException");
+      env->ThrowNew(clazz, future->failure().c_str());
+      return NULL;
+    } else if (future->isDiscarded()) {
+      clazz = env->FindClass("java/util/concurrent/CancellationException");
+      env->ThrowNew(clazz, "Future was discarded");
+      return NULL;
+    }
+
+    CHECK(future->isReady());
+    Variable<string>* variable = new Variable<string>(future->get());
+
+    // Variable variable = new Variable();
+    clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+    jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+    jobject jvariable = env->NewObject(clazz, _init_);
+
+    jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+    env->SetLongField(jvariable, __variable, (jlong) variable);
+
+    return jvariable;
+  }
+
+  clazz = env->FindClass("java/util/concurrent/TimeoutException");
+  env->ThrowNew(clazz, "Failed to wait for future within timeout");
+
+  return NULL;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __get_finalize
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1finalize
+  (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+  Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+  delete future;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
  * Method:    __set
- * Signature: (Lorg/apache/mesos/state/Variable;)B
+ * Signature: (Lorg/apache/mesos/state/Variable;)J
  */
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
   (JNIEnv* env, jobject thiz, jobject jvariable)
 {
   jclass clazz = env->GetObjectClass(jvariable);
 
   jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
 
-  Variable<std::string>* variable =
-    (Variable<std::string>*) env->GetLongField(jvariable, __variable);
+  // Create a copy of the old variable to support the immutable Java API.
+  Variable<string>* variable = new Variable<string>(
+      *((Variable<string>*) env->GetLongField(jvariable, __variable)));
 
   clazz = env->GetObjectClass(thiz);
 
@@ -191,21 +325,194 @@ JNIEXPORT jboolean JNICALL Java_org_apac
 
   State<>* state = (State<>*) env->GetLongField(thiz, __state);
 
-  process::Future<bool> future = state->set(variable);
+  Future<bool>* future = new Future<bool>(state->set(variable));
+
+  return (jlong)
+    new std::pair<Variable<string>*, Future<bool>*>(variable, future);
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __set_cancel
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1cancel
+  (JNIEnv* env, jobject thiz, jlong jpair)
+{
+  std::pair<Variable<string>*, Future<bool>*>* pair =
+    (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+  Future<bool>* future = pair->second;
 
-  future.await();
+  if (!future->isDiscarded()) {
+    future->discard();
+    return (jboolean) future->isDiscarded();
+  }
+
+  return (jboolean) true;
+}
 
-  if (future.isFailed()) {
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __set_is_cancelled
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1cancelled
+  (JNIEnv* env, jobject thiz, jlong jpair)
+{
+  std::pair<Variable<string>*, Future<bool>*>* pair =
+    (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+  Future<bool>* future = pair->second;
+
+  return (jboolean) future->isDiscarded();
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __set_is_done
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1done
+  (JNIEnv* env, jobject thiz, jlong jpair)
+{
+  std::pair<Variable<string>*, Future<bool>*>* pair =
+    (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+  Future<bool>* future = pair->second;
+
+  return (jboolean) !future->isPending();
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __set_await
+ * Signature: (J)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1await
+  (JNIEnv* env, jobject thiz, jlong jpair)
+{
+  std::pair<Variable<string>*, Future<bool>*>* pair =
+    (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+  Future<bool>* future = pair->second;
+
+  future->await();
+
+  if (future->isFailed()) {
     jclass clazz = env->FindClass("java/util/concurrent/ExecutionException");
-    env->ThrowNew(clazz, future.failure().c_str());
-  } else if (future.isDiscarded()) {
+    env->ThrowNew(clazz, future->failure().c_str());
+    return NULL;
+  } else if (future->isDiscarded()) {
     jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
     env->ThrowNew(clazz, "Future was discarded");
+    return NULL;
   }
 
-  CHECK(future.isReady());
+  CHECK(future->isReady());
+
+  if (future->get()) {
+    // Copy our copy of the old variable to support the immutable Java API.
+    Variable<string>* variable = new Variable<string>(*pair->first);
+
+    // Variable variable = new Variable();
+    jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+    jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+    jobject jvariable = env->NewObject(clazz, _init_);
+
+    jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+    env->SetLongField(jvariable, __variable, (jlong) variable);
+
+    return jvariable;
+  }
+
+  return NULL;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __set_await_timeout
+ * Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1await_1timeout
+  (JNIEnv* env, jobject thiz, jlong jpair, jlong jtimeout, jobject junit)
+{
+  std::pair<Variable<string>*, Future<bool>*>* pair =
+    (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+  Future<bool>* future = pair->second;
+
+  jclass clazz = env->GetObjectClass(junit);
+
+  // long seconds = unit.toSeconds(time);
+  jmethodID toSeconds = env->GetMethodID(clazz, "toSeconds", "(J)J");
+
+  jlong jseconds = env->CallLongMethod(junit, toSeconds, jtimeout);
+
+  seconds timeout(jseconds);
+
+  if (future->await(timeout.value)) {
+    if (future->isFailed()) {
+      clazz = env->FindClass("java/util/concurrent/ExecutionException");
+      env->ThrowNew(clazz, future->failure().c_str());
+      return NULL;
+    } else if (future->isDiscarded()) {
+      clazz = env->FindClass("java/util/concurrent/CancellationException");
+      env->ThrowNew(clazz, "Future was discarded");
+      return NULL;
+    }
+
+    CHECK(future->isReady());
+
+    if (future->get()) {
+      // Copy our copy of the old variable to support the immutable Java API.
+      Variable<string>* variable = new Variable<string>(*pair->first);
+
+      // Variable variable = new Variable();
+      clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+      jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+      jobject jvariable = env->NewObject(clazz, _init_);
+
+      jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+      env->SetLongField(jvariable, __variable, (jlong) variable);
+
+      return jvariable;
+    }
+
+    return NULL;
+  }
+
+  clazz = env->FindClass("java/util/concurrent/TimeoutException");
+  env->ThrowNew(clazz, "Failed to wait for future within timeout");
+
+  return NULL;
+}
+
+
+/*
+ * Class:     org_apache_mesos_state_ZooKeeperState
+ * Method:    __set_finalize
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1finalize
+  (JNIEnv* env, jobject thiz, jlong jpair)
+{
+  std::pair<Variable<string>*, Future<bool>*>* pair =
+    (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+  // We can delete the "variable" (i.e., pair->first) because we gave
+  // copies (on the heap) to each of the Java Variable objects.
 
-  return (jboolean) future.get();
+  delete pair->first;
+  delete pair->second;
+  delete pair;
 }
 
 } // extern "C" {

Added: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java?rev=1368755&view=auto
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java (added)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java Thu Aug  2 23:23:42 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+import java.util.UUID;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+/**
+ * An in-memory implementation of state.
+ */
+public class InMemoryState implements State {
+  @Override
+  public Future<Variable> get(String name) {
+    Entry entry = entries.get(name); // Is null if doesn't exist.
+
+    if (entry == null) {
+      entry = new Entry();
+      entry.name = name;
+      entry.uuid = UUID.randomUUID();
+      entry.value = new byte[0];
+      entries.put(name, entry);
+      entry = entries.putIfAbsent(name, entry);
+    }
+
+    assert entry != null; // ConcurrentMap.putIfAbsent should not return null.
+
+    return futureFrom((Variable) new InMemoryVariable(entry));
+  }
+
+  @Override
+  public Future<Variable> set(Variable v) {
+    InMemoryVariable variable = (InMemoryVariable) v;
+
+    Entry entry = new Entry();
+    entry.name = variable.entry.name;
+    entry.uuid = UUID.randomUUID();
+    entry.value = variable.value;
+
+    if (entries.replace(entry.name, variable.entry, entry)) {
+      return futureFrom((Variable) new InMemoryVariable(entry));
+    }
+
+    return futureFrom(null);
+  }
+
+  private static class InMemoryVariable extends Variable {
+    private InMemoryVariable(Entry entry) {
+      this(entry, null);
+    }
+
+    private InMemoryVariable(Entry entry, byte[] value) {
+      this.entry = entry;
+      this.value = value;
+    }
+
+    @Override
+    public byte[] value() {
+      if (this.value != null) {
+        return this.value;
+      } else {
+        return this.entry.value;
+      }
+    }
+
+    @Override
+    public Variable mutate(byte[] value) {
+      return new InMemoryVariable(entry, value);
+    }
+
+    final Entry entry;
+    final byte[] value;
+  }
+
+  private static class Entry {
+    @Override
+    public boolean equals(Object that) {
+      if (that instanceof Entry) {
+        return uuid.equals(((Entry) that).uuid);
+      }
+
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return uuid.hashCode();
+    }
+
+    String name;
+    UUID uuid;
+    byte[] value;
+  }
+
+  private static <T> Future<T> futureFrom(final T t) {
+    FutureTask<T> future = new FutureTask<T>(new Callable<T>() {
+        public T call() {
+          return t;
+        }});
+    future.run();
+    return future;
+  }
+
+  private final ConcurrentMap<String, Entry> entries =
+    new ConcurrentHashMap<String, Entry>();
+}

Modified: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java (original)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java Thu Aug  2 23:23:42 2012
@@ -38,19 +38,21 @@ import java.util.concurrent.Future;
  *   State state = new ZooKeeperState();
  *   Future<Variable> variable = state.get("machines");
  *   Variable machines = variable.get();
- *   machines.mutate(...);
- *   Future<Boolean> set = state.set(machines);
+ *   machines = machines.mutate(...);
+ *   variable = state.set(machines);
+ *   machines = variable.get();
  */
 public interface State {
   /**
-   * Returns a "variable" representing the value from the state
-   * associated with the specified name.
+   * Returns an immutable "variable" representing the current value
+   * from the state associated with the specified name.
    */
   Future<Variable> get(String name);
 
   /**
-   * Returns true if successfully updating the specified "variable"
-   * in the state.
+   * Returns an immutable "variable" representing the current value in
+   * the state if updating the specified variable in the state was
+   * successful, otherwise returns null.
    */
-  Future<Boolean> set(Variable variable);
+  Future<Variable> set(Variable variable);
 }

Modified: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java (original)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java Thu Aug  2 23:23:42 2012
@@ -19,7 +19,7 @@
 package org.apache.mesos.state;
 
 public class Variable {
-  private Variable() {} // Only constructable by native code.
+  protected Variable() {}
 
   /**
    * Returns the current value of this variable.
@@ -29,7 +29,7 @@ public class Variable {
   /**
    * Updates the current value of this variable.
    */
-  public native void mutate(byte[] value);
+  public native Variable mutate(byte[] value);
 
   protected native void finalize();
 

Modified: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java (original)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java Thu Aug  2 23:23:42 2012
@@ -18,9 +18,9 @@
 
 package org.apache.mesos.state;
 
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.mesos.MesosNativeLibrary;
@@ -72,22 +72,82 @@ class ZooKeeperState implements State {
 
   @Override
   public Future<Variable> get(final String name) {
-    // TODO(benh): Asynchronously start the operation before returning.
-    return new FutureTask<Variable>(new Callable<Variable>() {
-        public Variable call() {
-          return __get(name);
+    final long future = __get(name); // Asynchronously start the operation.
+    return new Future<Variable>() {
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        if (mayInterruptIfRunning) {
+          return __get_cancel(future);
         }
-      });
+        return false; // Should not interrupt and already running (or finished).
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return __get_is_cancelled(future);
+      }
+
+      @Override
+      public boolean isDone() {
+        return __get_is_done(future);
+      }
+
+      @Override
+      public Variable get() throws InterruptedException, ExecutionException {
+        return __get_await(future);
+      }
+
+      @Override
+      public Variable get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return __get_await_timeout(future, timeout, unit);
+      }
+
+      @Override
+      protected void finalize() {
+        __get_finalize(future);
+      }
+    };
   }
 
   @Override
-  public Future<Boolean> set(final Variable variable) {
-    // TODO(benh): Asynchronously start the operation before returning.
-    return new FutureTask<Boolean>(new Callable<Boolean>() {
-        public Boolean call() {
-          return __set(variable);
+  public Future<Variable> set(Variable variable) {
+    final long pair = __set(variable); // Asynchronously start the operation.
+    return new Future<Variable>() {
+      @Override
+      public boolean cancel(boolean mayInterruptIfRunning) {
+        if (mayInterruptIfRunning) {
+          return __set_cancel(pair);
         }
-      });
+        return false; // Should not interrupt and already running (or finished).
+      }
+
+      @Override
+      public boolean isCancelled() {
+        return __set_is_cancelled(pair);
+      }
+
+      @Override
+      public boolean isDone() {
+        return __set_is_done(pair);
+      }
+
+      @Override
+      public Variable get() throws InterruptedException, ExecutionException {
+        return __set_await(pair);
+      }
+
+      @Override
+      public Variable get(long timeout, TimeUnit unit)
+        throws InterruptedException, ExecutionException, TimeoutException {
+        return __set_await_timeout(pair, timeout, unit);
+      }
+
+      @Override
+      protected void finalize() {
+        __set_finalize(pair);
+      }
+    };
   }
 
   protected native void initialize(String servers,
@@ -105,8 +165,23 @@ class ZooKeeperState implements State {
   protected native void finalize();
 
   // Native implementations of get/set.
-  private native Variable __get(String name);
-  private native boolean __set(Variable variable);
+  private native long __get(String name);
+  private native boolean __get_cancel(long future);
+  private native boolean __get_is_cancelled(long future);
+  private native boolean __get_is_done(long future);
+  private native Variable __get_await(long future);
+  private native Variable __get_await_timeout(
+      long future, long timeout, TimeUnit unit);
+  private native void __get_finalize(long future);
+
+  private native long __set(Variable variable);
+  private native boolean __set_cancel(long pair);
+  private native boolean __set_is_cancelled(long pair);
+  private native boolean __set_is_done(long pair);
+  private native Variable __set_await(long pair);
+  private native Variable __set_await_timeout(
+      long pair, long timeout, TimeUnit unit);
+  private native void __set_finalize(long pair);
 
   private long __state;
 };

Modified: incubator/mesos/trunk/src/state/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/state.hpp?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/state.hpp (original)
+++ incubator/mesos/trunk/src/state/state.hpp Thu Aug  2 23:23:42 2012
@@ -72,11 +72,6 @@ public:
     return &t;
   }
 
-  void mutate(const T& _t)
-  {
-    t = _t;
-  }
-
 private:
   template <typename Serializer>
   friend class State; // Creates and manages variables.