You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/02/20 08:47:47 UTC
[2/5] git commit: Updated Mesos to use new libprocess discard
semantics.
Updated Mesos to use new libprocess discard semantics.
Review: https://reviews.apache.org/r/17686
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/24a31366
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/24a31366
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/24a31366
Branch: refs/heads/master
Commit: 24a3136646dc4a89836ad1bc8a08675814ba32f2
Parents: 0178e0f
Author: Benjamin Hindman <be...@gmail.com>
Authored: Sat Feb 1 18:08:00 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Feb 19 23:45:52 2014 -0800
----------------------------------------------------------------------
.../org_apache_mesos_state_AbstractState.cpp | 96 ++++++++++++--------
src/linux/cgroups.cpp | 68 ++++++++------
src/log/catchup.cpp | 13 ++-
src/log/consensus.cpp | 19 +++-
src/log/log.cpp | 8 ++
src/log/recover.cpp | 6 +-
src/master/detector.cpp | 4 +-
src/master/registrar.cpp | 3 +-
src/sasl/authenticatee.hpp | 18 +++-
src/sasl/authenticator.hpp | 20 +++-
src/slave/gc.cpp | 6 +-
src/slave/gc.hpp | 3 +
src/zookeeper/contender.cpp | 6 +-
src/zookeeper/group.cpp | 2 +-
14 files changed, 190 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/java/jni/org_apache_mesos_state_AbstractState.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_state_AbstractState.cpp b/src/java/jni/org_apache_mesos_state_AbstractState.cpp
index 2ee0b1b..0c7aebf 100644
--- a/src/java/jni/org_apache_mesos_state_AbstractState.cpp
+++ b/src/java/jni/org_apache_mesos_state_AbstractState.cpp
@@ -78,12 +78,11 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1fetch_1
{
Future<Variable>* future = (Future<Variable>*) jfuture;
- if (!future->isDiscarded()) {
- future->discard();
- return (jboolean) future->isDiscarded();
- }
+ // We'll initiate a discard but we won't consider it cancelled since
+ // we don't know if/when the future will get discarded.
+ future->discard();
- return (jboolean) true;
+ return (jboolean) false;
}
@@ -95,9 +94,12 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1fetch_1
JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1fetch_1is_1cancelled
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Variable>* future = (Future<Variable>*) jfuture;
-
- return (jboolean) future->isDiscarded();
+ // We always return false since while we might discard the future in
+ // 'cancel' we don't know if it has really been discarded and we
+ // don't want this function to block. We choose to be deterministic
+ // here and always return false rather than sometimes returning true
+ // if the future has completed (been discarded or otherwise).
+ return (jboolean) false;
}
@@ -111,7 +113,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1fetch_1
{
Future<Variable>* future = (Future<Variable>*) jfuture;
- return (jboolean) !future->isPending();
+ return (jboolean) !future->isPending() || future->hasDiscard();
}
@@ -132,6 +134,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1fetch_1g
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
@@ -179,6 +183,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1fetch_1g
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
@@ -257,12 +263,11 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1store_1
{
Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
- if (!future->isDiscarded()) {
- future->discard();
- return (jboolean) future->isDiscarded();
- }
+ // We'll initiate a discard but we won't consider it cancelled since
+ // we don't know if/when the future will get discarded.
+ future->discard();
- return (jboolean) true;
+ return (jboolean) false;
}
@@ -274,9 +279,12 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1store_1
JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1store_1is_1cancelled
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
-
- return (jboolean) future->isDiscarded();
+ // We always return false since while we might discard the future in
+ // 'cancel' we don't know if it has really been discarded and we
+ // don't want this function to block. We choose to be deterministic
+ // here and always return false rather than sometimes returning true
+ // if the future has completed (been discarded or otherwise).
+ return (jboolean) false;
}
@@ -290,7 +298,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1store_1
{
Future<Option<Variable> >* future = (Future<Option<Variable> >*) jfuture;
- return (jboolean) !future->isPending();
+ return (jboolean) !future->isPending() || future->hasDiscard();
}
@@ -311,6 +319,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1store_1g
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
@@ -362,6 +372,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1store_1g
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
@@ -444,12 +456,11 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1expunge
{
Future<bool>* future = (Future<bool>*) jfuture;
- if (!future->isDiscarded()) {
- future->discard();
- return (jboolean) future->isDiscarded();
- }
+ // We'll initiate a discard but we won't consider it cancelled since
+ // we don't know if/when the future will get discarded.
+ future->discard();
- return (jboolean) true;
+ return (jboolean) false;
}
@@ -461,9 +472,12 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1expunge
JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1expunge_1is_1cancelled
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<bool>* future = (Future<bool>*) jfuture;
-
- return (jboolean) future->isDiscarded();
+ // We always return false since while we might discard the future in
+ // 'cancel' we don't know if it has really been discarded and we
+ // don't want this function to block. We choose to be deterministic
+ // here and always return false rather than sometimes returning true
+ // if the future has completed (been discarded or otherwise).
+ return (jboolean) false;
}
@@ -477,7 +491,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1expunge
{
Future<bool>* future = (Future<bool>*) jfuture;
- return (jboolean) !future->isPending();
+ return (jboolean) !future->isPending() || future->hasDiscard();
}
@@ -498,6 +512,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1expunge_
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
@@ -542,6 +558,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1expunge_
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
@@ -612,12 +630,11 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1
{
Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
- if (!future->isDiscarded()) {
- future->discard();
- return (jboolean) future->isDiscarded();
- }
+ // We'll initiate a discard but we won't consider it cancelled since
+ // we don't know if/when the future will get discarded.
+ future->discard();
- return (jboolean) true;
+ return (jboolean) false;
}
/*
@@ -628,9 +645,12 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1
JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1is_1cancelled
(JNIEnv* env, jobject thiz, jlong jfuture)
{
- Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
-
- return (jboolean) future->isDiscarded();
+ // We always return false since while we might discard the future in
+ // 'cancel' we don't know if it has really been discarded and we
+ // don't want this function to block. We choose to be deterministic
+ // here and always return false rather than sometimes returning true
+ // if the future has completed (been discarded or otherwise).
+ return (jboolean) false;
}
@@ -644,7 +664,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1
{
Future<vector<string> >* future = (Future<vector<string> >*) jfuture;
- return (jboolean) !future->isPending();
+ return (jboolean) !future->isPending() || future->hasDiscard();
}
@@ -665,6 +685,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1g
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
jclass clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
@@ -719,6 +741,8 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_state_AbstractState__1_1names_1g
env->ThrowNew(clazz, future->failure().c_str());
return NULL;
} else if (future->isDiscarded()) {
+ // TODO(benh): Consider throwing an ExecutionException since we
+ // never return true for 'isCancelled'.
clazz = env->FindClass("java/util/concurrent/CancellationException");
env->ThrowNew(clazz, "Future was discarded");
return NULL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/linux/cgroups.cpp
----------------------------------------------------------------------
diff --git a/src/linux/cgroups.cpp b/src/linux/cgroups.cpp
index 8ac2599..baeb35d 100644
--- a/src/linux/cgroups.cpp
+++ b/src/linux/cgroups.cpp
@@ -1181,7 +1181,7 @@ protected:
// Stop the listener if no one cares. Note that here we explicitly specify
// the type of the terminate function because it is an overloaded function.
// The compiler complains if we do not do it.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
// Register an eventfd "notifier" for the given control.
@@ -1215,6 +1215,10 @@ protected:
LOG(ERROR) << "Failed to unregistering eventfd: " << unregister.error();
}
}
+
+ // TODO(benh): Discard our promise only after 'reading' has
+ // completed (ready, failed, or discarded).
+ promise.discard();
}
private:
@@ -1222,24 +1226,14 @@ private:
// result, either because the event has happened, or an error has occurred.
void notified(const Future<size_t>&)
{
- // Ignore this function if the promise is no longer pending.
- if (!promise.future().isPending()) {
- return;
- }
-
- // Since the future reading can only be discarded when the promise is no
- // longer pending, we shall never see a discarded reading here because of
- // the check in the beginning of the function.
- CHECK(!reading.isDiscarded());
-
- if (reading.isFailed()) {
+ if (reading.isDiscarded()) {
+ promise.discard();
+ } else if (reading.isFailed()) {
promise.fail("Failed to read eventfd: " + reading.failure());
+ } else if (reading.get() == sizeof(data)) {
+ promise.set(data);
} else {
- if (reading.get() == sizeof(data)) {
- promise.set(data);
- } else {
- promise.fail("Read less than expected");
- }
+ promise.fail("Read less than expected");
}
terminate(self());
@@ -1304,7 +1298,7 @@ protected:
virtual void initialize()
{
// Stop the process if no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
CHECK(interval >= Seconds(0));
@@ -1318,6 +1312,11 @@ protected:
}
}
+ virtual void finalize()
+ {
+ promise.discard();
+ }
+
private:
void freeze()
{
@@ -1562,7 +1561,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void (*)(const UPID&, bool)>(terminate), self(), true));
CHECK(interval >= Seconds(0));
@@ -1570,6 +1569,11 @@ protected:
check();
}
+ virtual void finalize()
+ {
+ promise.discard();
+ }
+
private:
void check(unsigned int attempt = 0)
{
@@ -1624,7 +1628,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
CHECK(interval >= Seconds(0));
@@ -1635,8 +1639,12 @@ protected:
virtual void finalize()
{
// Cancel the chain of operations if the user discards the future.
- if (promise.future().isDiscarded()) {
+ if (promise.future().hasDiscard()) {
chain.discard();
+
+ // TODO(benh): Discard our promise only after 'chain' has
+ // completed (ready, failed, or discarded).
+ promise.discard();
}
}
@@ -1689,8 +1697,10 @@ private:
void finished(const Future<bool>& empty)
{
- CHECK(!empty.isPending() && !empty.isDiscarded());
- if (empty.isFailed()) {
+ if (empty.isDiscarded()) {
+ promise.discard();
+ terminate(self());
+ } else if (empty.isFailed()) {
promise.fail(empty.failure());
terminate(self());
} else if (empty.get()) {
@@ -1731,7 +1741,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
CHECK(interval >= Seconds(0));
@@ -1752,17 +1762,23 @@ protected:
virtual void finalize()
{
// Cancel the operation if the user discards the future.
- if (promise.future().isDiscarded()) {
+ if (promise.future().hasDiscard()) {
discard<bool>(killers);
+
+ // TODO(benh): Discard our promise only after all 'killers' have
+ // completed (ready, failed, or discarded).
+ promise.discard();
}
}
private:
void killed(const Future<list<bool> >& kill)
{
- CHECK(!kill.isPending() && !kill.isDiscarded());
if (kill.isReady()) {
remove();
+ } else if (kill.isDiscarded()) {
+ promise.discard();
+ terminate(self());
} else if (kill.isFailed()) {
promise.fail("Failed to kill tasks in nested cgroups: " + kill.failure());
terminate(self());
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
index 59facbf..bcad278 100644
--- a/src/log/catchup.cpp
+++ b/src/log/catchup.cpp
@@ -64,7 +64,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
check();
@@ -74,6 +74,10 @@ protected:
{
checking.discard();
filling.discard();
+
+ // TODO(benh): Discard our promise only after 'checking' and
+ // 'filling' have completed (ready, failed, or discarded).
+ promise.discard();
}
private:
@@ -192,7 +196,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
// Catch-up sequentially.
@@ -204,6 +208,10 @@ protected:
virtual void finalize()
{
catching.discard();
+
+ // TODO(benh): Discard our promise only after 'catching' has
+ // completed (ready, failed, or discarded).
+ promise.discard();
}
private:
@@ -230,6 +238,7 @@ private:
Timer::create(timeout, lambda::bind(&Self::timedout, catching));
}
+
void discarded()
{
LOG(INFO) << "Unable to catch-up position " << *it
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/log/consensus.cpp
----------------------------------------------------------------------
diff --git a/src/log/consensus.cpp b/src/log/consensus.cpp
index b89673a..580db20 100644
--- a/src/log/consensus.cpp
+++ b/src/log/consensus.cpp
@@ -65,7 +65,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
// Wait until there are enough (i.e., quorum of) replicas in the
@@ -81,6 +81,8 @@ protected:
// quorum of replicas. In that case, we no longer care about
// responses from other replicas, thus discarding them here.
discard(responses);
+
+ promise.discard();
}
private:
@@ -242,7 +244,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
// Wait until there are enough (i.e., quorum of) replicas in the
@@ -258,6 +260,8 @@ protected:
// quorum of replicas. In that case, we no longer care about
// responses from other replicas, thus discarding them here.
discard(responses);
+
+ promise.discard();
}
private:
@@ -380,7 +384,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
// Wait until there are enough (i.e., quorum of) replicas in the
@@ -396,6 +400,8 @@ protected:
// quorum of replicas. In that case, we no longer care about
// responses from other replicas, thus discarding them here.
discard(responses);
+
+ promise.discard();
}
private:
@@ -524,7 +530,7 @@ protected:
virtual void initialize()
{
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
runPromisePhase();
@@ -532,8 +538,13 @@ protected:
virtual void finalize()
{
+ // Discard the futures we're waiting for.
promising.discard();
writing.discard();
+
+ // TODO(benh): Discard our promise only after 'promising' and
+ // 'writing' have completed (ready, failed, or discarded).
+ promise.discard();
}
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/log/log.cpp
----------------------------------------------------------------------
diff --git a/src/log/log.cpp b/src/log/log.cpp
index e83f822..62dc928 100644
--- a/src/log/log.cpp
+++ b/src/log/log.cpp
@@ -304,6 +304,8 @@ Future<Shared<Replica> > LogProcess::recover()
.onAny(defer(self(), &Self::_recover));
}
+ // TODO(benh): Add 'onDiscard' callback to our returned future.
+
return promise->future();
}
@@ -419,6 +421,9 @@ Future<Nothing> LogReaderProcess::recover()
// set/failed when '_recover' is called.
process::Promise<Nothing>* promise = new process::Promise<Nothing>();
promises.push_back(promise);
+
+ // TODO(benh): Add 'onDiscard' callback to our returned future.
+
return promise->future();
}
@@ -579,6 +584,9 @@ Future<Nothing> LogWriterProcess::recover()
// set/failed when '_recover' is called.
process::Promise<Nothing>* promise = new process::Promise<Nothing>();
promises.push_back(promise);
+
+ // TODO(benh): Add 'onDiscard' callback to our returned future.
+
return promise->future();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
index d06e5ad..1841f1f 100644
--- a/src/log/recover.cpp
+++ b/src/log/recover.cpp
@@ -103,7 +103,7 @@ protected:
LOG(INFO) << "Start recovering a replica";
// Stop when no one cares.
- promise.future().onDiscarded(lambda::bind(
+ promise.future().onDiscard(lambda::bind(
static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
// Check the current status of the local replica and decide if
@@ -119,6 +119,10 @@ protected:
// Cancel all operations if they are still pending.
discard(responses);
catching.discard();
+
+ // TODO(benh): Discard our promise only after 'catching' has
+ // completed (ready, failed, or discarded).
+ promise.discard();
}
private:
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/master/detector.cpp
----------------------------------------------------------------------
diff --git a/src/master/detector.cpp b/src/master/detector.cpp
index 7e10433..3a8aaed 100644
--- a/src/master/detector.cpp
+++ b/src/master/detector.cpp
@@ -147,7 +147,7 @@ MasterDetector::~MasterDetector() {}
StandaloneMasterDetectorProcess::~StandaloneMasterDetectorProcess()
{
foreach (Promise<Option<MasterInfo> >* promise, promises) {
- promise->future().discard();
+ promise->discard();
delete promise;
}
promises.clear();
@@ -253,7 +253,7 @@ ZooKeeperMasterDetectorProcess::ZooKeeperMasterDetectorProcess(
ZooKeeperMasterDetectorProcess::~ZooKeeperMasterDetectorProcess()
{
foreach (Promise<Option<MasterInfo> >* promise, promises) {
- promise->future().discard();
+ promise->discard();
delete promise;
}
promises.clear();
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 915885a..c7edbcd 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -330,7 +330,8 @@ void RegistrarProcess::update()
// Perform the store! Save the future so we can associate it with
// the mutations that are part of this update.
Future<bool> future =
- state->store(variable).then(defer(self(), &Self::_update, lambda::_1));
+ state->store(variable)
+ .then(defer(self(), &Self::_update, lambda::_1));
// TODO(benh): Add a timeout so we don't wait forever.
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/sasl/authenticatee.hpp
----------------------------------------------------------------------
diff --git a/src/sasl/authenticatee.hpp b/src/sasl/authenticatee.hpp
index f1a677f..42a4eba 100644
--- a/src/sasl/authenticatee.hpp
+++ b/src/sasl/authenticatee.hpp
@@ -23,6 +23,7 @@
#include <mesos/mesos.hpp>
+#include <process/defer.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/once.hpp>
@@ -51,7 +52,10 @@ public:
// Returns true if successfully authenticated otherwise false or an
// error. Note that we distinguish authentication failure (false)
// from a failed future in the event the future failed due to a
- // transient error and authentication can (should) be retried.
+ // transient error and authentication can (should) be
+ // retried. Discarding the future will cause the future to fail if
+ // it hasn't already completed since we have already started the
+ // authentication procedure and can't reliably cancel.
process::Future<bool> authenticate(const process::UPID& pid);
private:
@@ -172,6 +176,9 @@ public:
status = STARTING;
+ // Stop authenticating if nobody cares.
+ promise.future().onDiscard(defer(self(), &Self::discarded));
+
return promise.future();
}
@@ -313,6 +320,12 @@ protected:
promise.fail("Authentication error: " + error);
}
+ void discarded()
+ {
+ status = DISCARDED;
+ promise.fail("Authentication discarded");
+ }
+
private:
static int user(
void* context,
@@ -354,7 +367,8 @@ private:
STEPPING,
COMPLETED,
FAILED,
- ERROR
+ ERROR,
+ DISCARDED
} status;
sasl_conn_t* connection;
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/sasl/authenticator.hpp
----------------------------------------------------------------------
diff --git a/src/sasl/authenticator.hpp b/src/sasl/authenticator.hpp
index 1478f67..e1db783 100644
--- a/src/sasl/authenticator.hpp
+++ b/src/sasl/authenticator.hpp
@@ -27,6 +27,7 @@
#include <mesos/mesos.hpp>
+#include <process/defer.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/once.hpp>
@@ -51,6 +52,13 @@ public:
Authenticator(const process::UPID& pid);
~Authenticator();
+ // Returns true if successfully authenticated otherwise false or an
+ // error. Note that we distinguish authentication failure (false)
+ // from a failed future in the event the future failed due to a
+ // transient error and authentication can (should) be
+ // retried. Discarding the future will cause the future to fail if
+ // it hasn't already completed since we have already started the
+ // authentication procedure and can't reliably cancel.
process::Future<bool> authenticate();
private:
@@ -202,6 +210,9 @@ public:
status = STARTING;
+ // Stop authenticating if nobody cares.
+ promise.future().onDiscard(defer(self(), &Self::discarded));
+
return promise.future();
}
@@ -283,6 +294,12 @@ protected:
handle(result, output, length);
}
+ void discarded()
+ {
+ status = DISCARDED;
+ promise.fail("Authentication discarded");
+ }
+
private:
static int getopt(
void* context,
@@ -351,7 +368,8 @@ private:
STEPPING,
COMPLETED,
FAILED,
- ERROR
+ ERROR,
+ DISCARDED
} status;
sasl_callback_t callbacks[2];
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/slave/gc.cpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.cpp b/src/slave/gc.cpp
index 405350b..3720255 100644
--- a/src/slave/gc.cpp
+++ b/src/slave/gc.cpp
@@ -44,7 +44,7 @@ namespace slave {
GarbageCollectorProcess::~GarbageCollectorProcess()
{
foreachvalue (const PathInfo& info, paths) {
- info.promise->future().discard();
+ info.promise->discard();
}
}
@@ -93,8 +93,8 @@ bool GarbageCollectorProcess::unschedule(const string& path)
// Locate the path.
foreach (const PathInfo& info, paths.get(timeout)) {
if (info.path == path) {
- // Discard the future.
- info.promise->future().discard();
+ // Discard the promise.
+ info.promise->discard();
// Clean up the maps.
CHECK(paths.remove(timeout, info));
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/slave/gc.hpp
----------------------------------------------------------------------
diff --git a/src/slave/gc.hpp b/src/slave/gc.hpp
index 328aa31..7b6fb83 100644
--- a/src/slave/gc.hpp
+++ b/src/slave/gc.hpp
@@ -62,12 +62,15 @@ public:
// The future will fail if the path did not exist, or on error.
// The future will be discarded if the path was unscheduled, or
// was rescheduled.
+ // Note that you currently cannot discard a returned future, instead
+ // you must call unschedule.
process::Future<Nothing> schedule(const Duration& d, const std::string& path);
// Unschedules the specified path for removal.
// The future will be true if the path has been unscheduled.
// The future will be false if the path is not scheduled for
// removal, or the path has already being removed.
+ // Note that you currently cannot discard a returned future.
process::Future<bool> unschedule(const std::string& path);
// Deletes all the directories, whose scheduled garbage collection time
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/zookeeper/contender.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/contender.cpp b/src/zookeeper/contender.cpp
index 6710da4..c2c7993 100644
--- a/src/zookeeper/contender.cpp
+++ b/src/zookeeper/contender.cpp
@@ -84,19 +84,19 @@ LeaderContenderProcess::LeaderContenderProcess(
LeaderContenderProcess::~LeaderContenderProcess()
{
if (contending.isSome()) {
- contending.get()->future().discard();
+ contending.get()->discard();
delete contending.get();
contending = None();
}
if (watching.isSome()) {
- watching.get()->future().discard();
+ watching.get()->discard();
delete watching.get();
watching = None();
}
if (withdrawing.isSome()) {
- withdrawing.get()->future().discard();
+ withdrawing.get()->discard();
delete withdrawing.get();
withdrawing = None();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/24a31366/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 793763e..247361d 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -62,7 +62,7 @@ void discard(queue<T*>* queue)
while (!queue->empty()) {
T* t = queue->front();
queue->pop();
- t->promise.future().discard();
+ t->promise.discard();
delete t;
}
}