You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/08/03 01:23:43 UTC
svn commit: r1368755 - in /incubator/mesos/trunk/src: ./ java/jni/
java/src/org/apache/mesos/state/ state/
Author: benh
Date: Thu Aug 2 23:23:42 2012
New Revision: 1368755
URL: http://svn.apache.org/viewvc?rev=1368755&view=rev
Log:
Refactored Java implementation of State to use immutable variables and
added an in-memory implementation for testing
(https://reviews.apache.org/r/6079).
Added:
incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java
Modified:
incubator/mesos/trunk/src/Makefile.am
incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp
incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java
incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java
incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java
incubator/mesos/trunk/src/state/state.hpp
Modified: incubator/mesos/trunk/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.am?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.am (original)
+++ incubator/mesos/trunk/src/Makefile.am Thu Aug 2 23:23:42 2012
@@ -475,6 +475,7 @@ MESOS_JAR_SOURCE = \
$(srcdir)/java/src/org/apache/mesos/MesosSchedulerDriver.java \
$(srcdir)/java/src/org/apache/mesos/SchedulerDriver.java \
$(srcdir)/java/src/org/apache/mesos/Scheduler.java \
+ $(srcdir)/java/src/org/apache/mesos/state/InMemoryState.java \
$(srcdir)/java/src/org/apache/mesos/state/State.java \
$(srcdir)/java/src/org/apache/mesos/state/Variable.java \
$(srcdir)/java/src/org/apache/mesos/state/ZooKeeperState.java
Modified: incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp (original)
+++ incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_Variable.cpp Thu Aug 2 23:23:42 2012
@@ -35,22 +35,36 @@ JNIEXPORT jbyteArray JNICALL Java_org_ap
/*
* Class: org_apache_mesos_state_Variable
* Method: mutate
- * Signature: ([B)V
+ * Signature: ([B)Lorg/apache/mesos/state/Variable;
*/
-JNIEXPORT void JNICALL Java_org_apache_mesos_state_Variable_mutate
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_Variable_mutate
(JNIEnv* env, jobject thiz, jbyteArray jvalue)
{
jclass clazz = env->GetObjectClass(thiz);
jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
- Variable<std::string>* variable =
- (Variable<std::string>*) env->GetLongField(thiz, __variable);
+ // Create a copy of the old variable to support the immutable Java API.
+ Variable<std::string>* variable = new Variable<std::string>(
+ *((Variable<std::string>*) env->GetLongField(thiz, __variable)));
jbyte* value = env->GetByteArrayElements(jvalue, NULL);
jsize length = env->GetArrayLength(jvalue);
+ // Update the value of the new copy.
(*variable)->assign((const char*) value, length);
+
+ env->ReleaseByteArrayElements(jvalue, value, 0);
+
+ // Variable variable = new Variable();
+ clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+ jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+ jobject jvariable = env->NewObject(clazz, _init_);
+
+ env->SetLongField(jvariable, __variable, (jlong) variable);
+
+ return jvariable;
}
Modified: incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp (original)
+++ incubator/mesos/trunk/src/java/jni/org_apache_mesos_state_ZooKeeperState.cpp Thu Aug 2 23:23:42 2012
@@ -11,6 +11,10 @@
using namespace mesos::internal::state;
+using process::Future;
+
+using std::string;
+
extern "C" {
/*
@@ -26,7 +30,7 @@ JNIEXPORT void JNICALL Java_org_apache_m
jobject junit,
jstring jznode)
{
- std::string servers = construct<std::string>(env, jservers);
+ string servers = construct<string>(env, jservers);
jclass clazz = env->GetObjectClass(junit);
@@ -37,7 +41,7 @@ JNIEXPORT void JNICALL Java_org_apache_m
seconds timeout(jseconds);
- std::string znode = construct<std::string>(env, jznode);
+ string znode = construct<string>(env, jznode);
// Create the C++ State and initialize the __state variable.
State<>* state = new ZooKeeperState<>(servers, timeout, znode);
@@ -64,7 +68,7 @@ JNIEXPORT void JNICALL Java_org_apache_m
jstring jscheme,
jbyteArray jcredentials)
{
- std::string servers = construct<std::string>(env, jservers);
+ string servers = construct<string>(env, jservers);
jclass clazz = env->GetObjectClass(junit);
@@ -75,17 +79,17 @@ JNIEXPORT void JNICALL Java_org_apache_m
seconds timeout(jseconds);
- std::string znode = construct<std::string>(env, jznode);
+ string znode = construct<string>(env, jznode);
// Create the C++ State.
State<>* state = NULL;
if (jscheme != NULL && jcredentials != NULL) {
- std::string scheme = construct<std::string>(env, jscheme);
+ string scheme = construct<string>(env, jscheme);
jbyte* temp = env->GetByteArrayElements(jcredentials, NULL);
jsize length = env->GetArrayLength(jcredentials);
- std::string credentials((char*) temp, (size_t) length);
+ string credentials((char*) temp, (size_t) length);
env->ReleaseByteArrayElements(jcredentials, temp, 0);
@@ -127,12 +131,12 @@ JNIEXPORT void JNICALL Java_org_apache_m
/*
* Class: org_apache_mesos_state_ZooKeeperState
* Method: __get
- * Signature: (Ljava/lang/String;)Lorg/apache/mesos/state/Variable;
+ * Signature: (Ljava/lang/String;)J
*/
-JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get
(JNIEnv* env, jobject thiz, jstring jname)
{
- std::string name = construct<std::string>(env, jname);
+ string name = construct<string>(env, jname);
jclass clazz = env->GetObjectClass(thiz);
@@ -140,25 +144,88 @@ JNIEXPORT jobject JNICALL Java_org_apach
State<>* state = (State<>*) env->GetLongField(thiz, __state);
- process::Future<Variable<std::string> > future =
- state->get<std::string>(name);
+ Future<Variable<string> >* future =
+ new Future<Variable<string> >(state->get<string>(name));
+
+ return (jlong) future;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __get_cancel
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1cancel
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
- future.await();
+ if (!future->isDiscarded()) {
+ future->discard();
+ return (jboolean) future->isDiscarded();
+ }
+
+ return (jboolean) true;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __get_is_cancelled
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1cancelled
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
- if (future.isFailed()) {
- clazz = env->FindClass("java/util/concurrent/ExecutionException");
- env->ThrowNew(clazz, future.failure().c_str());
- } else if (future.isDiscarded()) {
- clazz = env->FindClass("java/util/concurrent/CancellationException");
+ return (jboolean) future->isDiscarded();
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __get_is_done
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1is_1done
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+ return (jboolean) !future->isPending();
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __get_await
+ * Signature: (J)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1await
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+ future->await();
+
+ if (future->isFailed()) {
+ jclass clazz = env->FindClass("java/util/concurrent/ExecutionException");
+ env->ThrowNew(clazz, future->failure().c_str());
+ return NULL;
+ } else if (future->isDiscarded()) {
+ jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
+ return NULL;
}
- CHECK(future.isReady());
+ CHECK(future->isReady());
- Variable<std::string>* variable = new Variable<std::string>(future.get());
+ Variable<string>* variable = new Variable<string>(future->get());
// Variable variable = new Variable();
- clazz = env->FindClass("org/apache/mesos/state/Variable");
+ jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
jobject jvariable = env->NewObject(clazz, _init_);
@@ -172,18 +239,85 @@ JNIEXPORT jobject JNICALL Java_org_apach
/*
* Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __get_await_timeout
+ * Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1await_1timeout
+ (JNIEnv* env, jobject thiz, jlong jfuture, jlong jtimeout, jobject junit)
+{
+ Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+ jclass clazz = env->GetObjectClass(junit);
+
+ // long seconds = unit.toSeconds(time);
+ jmethodID toSeconds = env->GetMethodID(clazz, "toSeconds", "(J)J");
+
+ jlong jseconds = env->CallLongMethod(junit, toSeconds, jtimeout);
+
+ seconds timeout(jseconds);
+
+ if (future->await(timeout.value)) {
+ if (future->isFailed()) {
+ clazz = env->FindClass("java/util/concurrent/ExecutionException");
+ env->ThrowNew(clazz, future->failure().c_str());
+ return NULL;
+ } else if (future->isDiscarded()) {
+ clazz = env->FindClass("java/util/concurrent/CancellationException");
+ env->ThrowNew(clazz, "Future was discarded");
+ return NULL;
+ }
+
+ CHECK(future->isReady());
+ Variable<string>* variable = new Variable<string>(future->get());
+
+ // Variable variable = new Variable();
+ clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+ jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+ jobject jvariable = env->NewObject(clazz, _init_);
+
+ jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+ env->SetLongField(jvariable, __variable, (jlong) variable);
+
+ return jvariable;
+ }
+
+ clazz = env->FindClass("java/util/concurrent/TimeoutException");
+ env->ThrowNew(clazz, "Failed to wait for future within timeout");
+
+ return NULL;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __get_finalize
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1get_1finalize
+ (JNIEnv* env, jobject thiz, jlong jfuture)
+{
+ Future<Variable<string> >* future = (Future<Variable<string> >*) jfuture;
+
+ delete future;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
* Method: __set
- * Signature: (Lorg/apache/mesos/state/Variable;)B
+ * Signature: (Lorg/apache/mesos/state/Variable;)J
*/
-JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
+JNIEXPORT jlong JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set
(JNIEnv* env, jobject thiz, jobject jvariable)
{
jclass clazz = env->GetObjectClass(jvariable);
jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
- Variable<std::string>* variable =
- (Variable<std::string>*) env->GetLongField(jvariable, __variable);
+ // Create a copy of the old variable to support the immutable Java API.
+ Variable<string>* variable = new Variable<string>(
+ *((Variable<string>*) env->GetLongField(jvariable, __variable)));
clazz = env->GetObjectClass(thiz);
@@ -191,21 +325,194 @@ JNIEXPORT jboolean JNICALL Java_org_apac
State<>* state = (State<>*) env->GetLongField(thiz, __state);
- process::Future<bool> future = state->set(variable);
+ Future<bool>* future = new Future<bool>(state->set(variable));
+
+ return (jlong)
+ new std::pair<Variable<string>*, Future<bool>*>(variable, future);
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __set_cancel
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1cancel
+ (JNIEnv* env, jobject thiz, jlong jpair)
+{
+ std::pair<Variable<string>*, Future<bool>*>* pair =
+ (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+ Future<bool>* future = pair->second;
- future.await();
+ if (!future->isDiscarded()) {
+ future->discard();
+ return (jboolean) future->isDiscarded();
+ }
+
+ return (jboolean) true;
+}
- if (future.isFailed()) {
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __set_is_cancelled
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1cancelled
+ (JNIEnv* env, jobject thiz, jlong jpair)
+{
+ std::pair<Variable<string>*, Future<bool>*>* pair =
+ (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+ Future<bool>* future = pair->second;
+
+ return (jboolean) future->isDiscarded();
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __set_is_done
+ * Signature: (J)Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1is_1done
+ (JNIEnv* env, jobject thiz, jlong jpair)
+{
+ std::pair<Variable<string>*, Future<bool>*>* pair =
+ (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+ Future<bool>* future = pair->second;
+
+ return (jboolean) !future->isPending();
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __set_await
+ * Signature: (J)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1await
+ (JNIEnv* env, jobject thiz, jlong jpair)
+{
+ std::pair<Variable<string>*, Future<bool>*>* pair =
+ (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+ Future<bool>* future = pair->second;
+
+ future->await();
+
+ if (future->isFailed()) {
jclass clazz = env->FindClass("java/util/concurrent/ExecutionException");
- env->ThrowNew(clazz, future.failure().c_str());
- } else if (future.isDiscarded()) {
+ env->ThrowNew(clazz, future->failure().c_str());
+ return NULL;
+ } else if (future->isDiscarded()) {
jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
+ return NULL;
}
- CHECK(future.isReady());
+ CHECK(future->isReady());
+
+ if (future->get()) {
+ // Copy our copy of the old variable to support the immutable Java API.
+ Variable<string>* variable = new Variable<string>(*pair->first);
+
+ // Variable variable = new Variable();
+ jclass clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+ jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+ jobject jvariable = env->NewObject(clazz, _init_);
+
+ jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+ env->SetLongField(jvariable, __variable, (jlong) variable);
+
+ return jvariable;
+ }
+
+ return NULL;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __set_await_timeout
+ * Signature: (JJLjava/util/concurrent/TimeUnit;)Lorg/apache/mesos/state/Variable;
+ */
+JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1await_1timeout
+ (JNIEnv* env, jobject thiz, jlong jpair, jlong jtimeout, jobject junit)
+{
+ std::pair<Variable<string>*, Future<bool>*>* pair =
+ (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+ Future<bool>* future = pair->second;
+
+ jclass clazz = env->GetObjectClass(junit);
+
+ // long seconds = unit.toSeconds(time);
+ jmethodID toSeconds = env->GetMethodID(clazz, "toSeconds", "(J)J");
+
+ jlong jseconds = env->CallLongMethod(junit, toSeconds, jtimeout);
+
+ seconds timeout(jseconds);
+
+ if (future->await(timeout.value)) {
+ if (future->isFailed()) {
+ clazz = env->FindClass("java/util/concurrent/ExecutionException");
+ env->ThrowNew(clazz, future->failure().c_str());
+ return NULL;
+ } else if (future->isDiscarded()) {
+ clazz = env->FindClass("java/util/concurrent/CancellationException");
+ env->ThrowNew(clazz, "Future was discarded");
+ return NULL;
+ }
+
+ CHECK(future->isReady());
+
+ if (future->get()) {
+ // Copy our copy of the old variable to support the immutable Java API.
+ Variable<string>* variable = new Variable<string>(*pair->first);
+
+ // Variable variable = new Variable();
+ clazz = env->FindClass("org/apache/mesos/state/Variable");
+
+ jmethodID _init_ = env->GetMethodID(clazz, "<init>", "()V");
+ jobject jvariable = env->NewObject(clazz, _init_);
+
+ jfieldID __variable = env->GetFieldID(clazz, "__variable", "J");
+ env->SetLongField(jvariable, __variable, (jlong) variable);
+
+ return jvariable;
+ }
+
+ return NULL;
+ }
+
+ clazz = env->FindClass("java/util/concurrent/TimeoutException");
+ env->ThrowNew(clazz, "Failed to wait for future within timeout");
+
+ return NULL;
+}
+
+
+/*
+ * Class: org_apache_mesos_state_ZooKeeperState
+ * Method: __set_finalize
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_state_ZooKeeperState__1_1set_1finalize
+ (JNIEnv* env, jobject thiz, jlong jpair)
+{
+ std::pair<Variable<string>*, Future<bool>*>* pair =
+ (std::pair<Variable<string>*, Future<bool>*>*) jpair;
+
+ // We can delete the "variable" (i.e., pair->first) because we gave
+ // copies (on the heap) to each of the Java Variable objects.
- return (jboolean) future.get();
+ delete pair->first;
+ delete pair->second;
+ delete pair;
}
} // extern "C" {
Added: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java?rev=1368755&view=auto
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java (added)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/InMemoryState.java Thu Aug 2 23:23:42 2012
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mesos.state;
+
+import java.util.UUID;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+/**
+ * An in-memory implementation of state.
+ */
+public class InMemoryState implements State {
+ @Override
+ public Future<Variable> get(String name) {
+ Entry entry = entries.get(name); // Is null if doesn't exist.
+
+ if (entry == null) {
+ entry = new Entry();
+ entry.name = name;
+ entry.uuid = UUID.randomUUID();
+ entry.value = new byte[0];
+ entries.put(name, entry);
+ entry = entries.putIfAbsent(name, entry);
+ }
+
+ assert entry != null; // ConcurrentMap.putIfAbsent should not return null.
+
+ return futureFrom((Variable) new InMemoryVariable(entry));
+ }
+
+ @Override
+ public Future<Variable> set(Variable v) {
+ InMemoryVariable variable = (InMemoryVariable) v;
+
+ Entry entry = new Entry();
+ entry.name = variable.entry.name;
+ entry.uuid = UUID.randomUUID();
+ entry.value = variable.value;
+
+ if (entries.replace(entry.name, variable.entry, entry)) {
+ return futureFrom((Variable) new InMemoryVariable(entry));
+ }
+
+ return futureFrom(null);
+ }
+
+ private static class InMemoryVariable extends Variable {
+ private InMemoryVariable(Entry entry) {
+ this(entry, null);
+ }
+
+ private InMemoryVariable(Entry entry, byte[] value) {
+ this.entry = entry;
+ this.value = value;
+ }
+
+ @Override
+ public byte[] value() {
+ if (this.value != null) {
+ return this.value;
+ } else {
+ return this.entry.value;
+ }
+ }
+
+ @Override
+ public Variable mutate(byte[] value) {
+ return new InMemoryVariable(entry, value);
+ }
+
+ final Entry entry;
+ final byte[] value;
+ }
+
+ private static class Entry {
+ @Override
+ public boolean equals(Object that) {
+ if (that instanceof Entry) {
+ return uuid.equals(((Entry) that).uuid);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return uuid.hashCode();
+ }
+
+ String name;
+ UUID uuid;
+ byte[] value;
+ }
+
+ private static <T> Future<T> futureFrom(final T t) {
+ FutureTask<T> future = new FutureTask<T>(new Callable<T>() {
+ public T call() {
+ return t;
+ }});
+ future.run();
+ return future;
+ }
+
+ private final ConcurrentMap<String, Entry> entries =
+ new ConcurrentHashMap<String, Entry>();
+}
Modified: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java (original)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/State.java Thu Aug 2 23:23:42 2012
@@ -38,19 +38,21 @@ import java.util.concurrent.Future;
* State state = new ZooKeeperState();
* Future<Variable> variable = state.get("machines");
* Variable machines = variable.get();
- * machines.mutate(...);
- * Future<Boolean> set = state.set(machines);
+ * machines = machines.mutate(...);
+ * variable = state.set(machines);
+ * machines = variable.get();
*/
public interface State {
/**
- * Returns a "variable" representing the value from the state
- * associated with the specified name.
+ * Returns an immutable "variable" representing the current value
+ * from the state associated with the specified name.
*/
Future<Variable> get(String name);
/**
- * Returns true if successfully updating the specified "variable"
- * in the state.
+ * Returns an immutable "variable" representing the current value in
+ * the state if updating the specified variable in the state was
+ * successful, otherwise returns null.
*/
- Future<Boolean> set(Variable variable);
+ Future<Variable> set(Variable variable);
}
Modified: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java (original)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/Variable.java Thu Aug 2 23:23:42 2012
@@ -19,7 +19,7 @@
package org.apache.mesos.state;
public class Variable {
- private Variable() {} // Only constructable by native code.
+ protected Variable() {}
/**
* Returns the current value of this variable.
@@ -29,7 +29,7 @@ public class Variable {
/**
* Updates the current value of this variable.
*/
- public native void mutate(byte[] value);
+ public native Variable mutate(byte[] value);
protected native void finalize();
Modified: incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java (original)
+++ incubator/mesos/trunk/src/java/src/org/apache/mesos/state/ZooKeeperState.java Thu Aug 2 23:23:42 2012
@@ -18,9 +18,9 @@
package org.apache.mesos.state;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import org.apache.mesos.MesosNativeLibrary;
@@ -72,22 +72,82 @@ class ZooKeeperState implements State {
@Override
public Future<Variable> get(final String name) {
- // TODO(benh): Asynchronously start the operation before returning.
- return new FutureTask<Variable>(new Callable<Variable>() {
- public Variable call() {
- return __get(name);
+ final long future = __get(name); // Asynchronously start the operation.
+ return new Future<Variable>() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (mayInterruptIfRunning) {
+ return __get_cancel(future);
}
- });
+ return false; // Should not interrupt and already running (or finished).
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return __get_is_cancelled(future);
+ }
+
+ @Override
+ public boolean isDone() {
+ return __get_is_done(future);
+ }
+
+ @Override
+ public Variable get() throws InterruptedException, ExecutionException {
+ return __get_await(future);
+ }
+
+ @Override
+ public Variable get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return __get_await_timeout(future, timeout, unit);
+ }
+
+ @Override
+ protected void finalize() {
+ __get_finalize(future);
+ }
+ };
}
@Override
- public Future<Boolean> set(final Variable variable) {
- // TODO(benh): Asynchronously start the operation before returning.
- return new FutureTask<Boolean>(new Callable<Boolean>() {
- public Boolean call() {
- return __set(variable);
+ public Future<Variable> set(Variable variable) {
+ final long pair = __set(variable); // Asynchronously start the operation.
+ return new Future<Variable>() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (mayInterruptIfRunning) {
+ return __set_cancel(pair);
}
- });
+ return false; // Should not interrupt and already running (or finished).
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return __set_is_cancelled(pair);
+ }
+
+ @Override
+ public boolean isDone() {
+ return __set_is_done(pair);
+ }
+
+ @Override
+ public Variable get() throws InterruptedException, ExecutionException {
+ return __set_await(pair);
+ }
+
+ @Override
+ public Variable get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return __set_await_timeout(pair, timeout, unit);
+ }
+
+ @Override
+ protected void finalize() {
+ __set_finalize(pair);
+ }
+ };
}
protected native void initialize(String servers,
@@ -105,8 +165,23 @@ class ZooKeeperState implements State {
protected native void finalize();
// Native implementations of get/set.
- private native Variable __get(String name);
- private native boolean __set(Variable variable);
+ private native long __get(String name);
+ private native boolean __get_cancel(long future);
+ private native boolean __get_is_cancelled(long future);
+ private native boolean __get_is_done(long future);
+ private native Variable __get_await(long future);
+ private native Variable __get_await_timeout(
+ long future, long timeout, TimeUnit unit);
+ private native void __get_finalize(long future);
+
+ private native long __set(Variable variable);
+ private native boolean __set_cancel(long pair);
+ private native boolean __set_is_cancelled(long pair);
+ private native boolean __set_is_done(long pair);
+ private native Variable __set_await(long pair);
+ private native Variable __set_await_timeout(
+ long pair, long timeout, TimeUnit unit);
+ private native void __set_finalize(long pair);
private long __state;
};
Modified: incubator/mesos/trunk/src/state/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/state/state.hpp?rev=1368755&r1=1368754&r2=1368755&view=diff
==============================================================================
--- incubator/mesos/trunk/src/state/state.hpp (original)
+++ incubator/mesos/trunk/src/state/state.hpp Thu Aug 2 23:23:42 2012
@@ -72,11 +72,6 @@ public:
return &t;
}
- void mutate(const T& _t)
- {
- t = _t;
- }
-
private:
template <typename Serializer>
friend class State; // Creates and manages variables.