You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/05/14 08:15:21 UTC
svn commit: r1338037 - in /incubator/mesos/trunk/src: log/network.hpp
tests/zookeeper_tests.cpp zookeeper/group.cpp zookeeper/group.hpp
Author: benh
Date: Mon May 14 06:15:20 2012
New Revision: 1338037
URL: http://svn.apache.org/viewvc?rev=1338037&view=rev
Log:
Updated Group and Group::Membership API.
(1) Renamed Group::info to Group::data (and associated changes).
(2) Added Group::Membership::watch to explicitly watch for a single
membership's cancellation (and associated GroupProcess changes).
(3) Fixed a but in GroupProcess::update that was deleting a Watch
object when also re-enqueueing it.
(4) Updated the GroupProcess destructor to fail all outstanding
futures associated with the Group.
Modified:
incubator/mesos/trunk/src/log/network.hpp
incubator/mesos/trunk/src/tests/zookeeper_tests.cpp
incubator/mesos/trunk/src/zookeeper/group.cpp
incubator/mesos/trunk/src/zookeeper/group.hpp
Modified: incubator/mesos/trunk/src/log/network.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/network.hpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/network.hpp (original)
+++ incubator/mesos/trunk/src/log/network.hpp Mon May 14 06:15:20 2012
@@ -32,6 +32,7 @@
#include "common/foreach.hpp"
#include "common/lambda.hpp"
+#include "common/logging.hpp"
#include "common/seconds.hpp"
#include "common/utils.hpp"
@@ -103,7 +104,7 @@ private:
zookeeper::Group* group;
process::Executor executor;
process::Future<std::set<zookeeper::Group::Membership> > memberships;
- process::Future<std::list<std::string> > infos;
+ process::Future<std::list<std::string> > datas;
};
@@ -276,19 +277,19 @@ inline void ZooKeeperNetwork::watched()
std::list<process::Future<std::string> > futures;
foreach (const zookeeper::Group::Membership& membership, memberships.get()) {
- futures.push_back(group->info(membership));
+ futures.push_back(group->data(membership));
}
- infos = process::collect(futures, process::Timeout(5.0));
- infos.onAny(executor.defer(lambda::bind(&This::collected, this)));
+ datas = process::collect(futures, process::Timeout(5.0));
+ datas.onAny(executor.defer(lambda::bind(&This::collected, this)));
}
inline void ZooKeeperNetwork::collected()
{
- if (infos.isFailed()) {
+ if (datas.isFailed()) {
LOG(WARNING) << "Failed to get data for ZooKeeper group members: "
- << infos.failure();
+ << datas.failure();
// Try again later assuming empty group. Note that this does not
// remove any of the current group members.
@@ -296,13 +297,13 @@ inline void ZooKeeperNetwork::collected(
return;
}
- CHECK(infos.isReady()); // Not expecting collect to discard futures.
+ CHECK(datas.isReady()); // Not expecting collect to discard futures.
std::set<process::UPID> pids;
- foreach (const std::string& info, infos.get()) {
- process::UPID pid(info);
- CHECK(pid) << "Failed to parse '" << info << "'";
+ foreach (const std::string& data, datas.get()) {
+ process::UPID pid(data);
+ CHECK(pid) << "Failed to parse '" << data << "'";
pids.insert(pid);
}
Modified: incubator/mesos/trunk/src/tests/zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/zookeeper_tests.cpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/zookeeper_tests.cpp Mon May 14 06:15:20 2012
@@ -93,14 +93,14 @@ TEST_F(ZooKeeperTest, Group)
EXPECT_EQ(1, memberships.get().size());
EXPECT_EQ(1, memberships.get().count(membership.get()));
- process::Future<std::string> info = group.info(membership.get());
+ process::Future<std::string> data = group.data(membership.get());
- info.await();
+ data.await();
- ASSERT_FALSE(info.isFailed()) << info.failure();
- ASSERT_FALSE(info.isDiscarded());
- ASSERT_TRUE(info.isReady());
- EXPECT_EQ("hello world", info.get());
+ ASSERT_FALSE(data.isFailed()) << data.failure();
+ ASSERT_FALSE(data.isDiscarded());
+ ASSERT_TRUE(data.isReady());
+ EXPECT_EQ("hello world", data.get());
process::Future<bool> cancellation = group.cancel(membership.get());
@@ -117,6 +117,9 @@ TEST_F(ZooKeeperTest, Group)
ASSERT_TRUE(memberships.isReady());
EXPECT_EQ(0, memberships.get().size());
+
+ ASSERT_TRUE(membership.get().cancelled().isReady());
+ ASSERT_TRUE(membership.get().cancelled().get());
}
@@ -150,7 +153,7 @@ TEST_F(ZooKeeperTest, GroupJoinWithDisco
}
-TEST_F(ZooKeeperTest, GroupInfoWithDisconnect)
+TEST_F(ZooKeeperTest, GroupDataWithDisconnect)
{
zookeeper::Group group(zks->connectString(), NO_TIMEOUT, "/test/");
@@ -174,18 +177,18 @@ TEST_F(ZooKeeperTest, GroupInfoWithDisco
zks->shutdownNetwork();
- process::Future<std::string> info = group.info(membership.get());
+ process::Future<std::string> data = group.data(membership.get());
- EXPECT_TRUE(info.isPending());
+ EXPECT_TRUE(data.isPending());
zks->startNetwork();
- info.await();
+ data.await();
- ASSERT_FALSE(info.isFailed()) << info.failure();
- ASSERT_FALSE(info.isDiscarded());
- ASSERT_TRUE(info.isReady());
- EXPECT_EQ("hello world", info.get());
+ ASSERT_FALSE(data.isFailed()) << data.failure();
+ ASSERT_FALSE(data.isDiscarded());
+ ASSERT_TRUE(data.isReady());
+ EXPECT_EQ("hello world", data.get());
}
@@ -211,16 +214,16 @@ TEST_F(ZooKeeperTest, GroupCancelWithDis
EXPECT_EQ(1, memberships.get().size());
EXPECT_EQ(1, memberships.get().count(membership.get()));
- process::Future<std::string> info = group.info(membership.get());
+ process::Future<std::string> data = group.data(membership.get());
- EXPECT_TRUE(info.isPending());
+ EXPECT_TRUE(data.isPending());
- info.await();
+ data.await();
- ASSERT_FALSE(info.isFailed()) << info.failure();
- ASSERT_FALSE(info.isDiscarded());
- ASSERT_TRUE(info.isReady());
- EXPECT_EQ("hello world", info.get());
+ ASSERT_FALSE(data.isFailed()) << data.failure();
+ ASSERT_FALSE(data.isDiscarded());
+ ASSERT_TRUE(data.isReady());
+ EXPECT_EQ("hello world", data.get());
zks->shutdownNetwork();
@@ -243,6 +246,9 @@ TEST_F(ZooKeeperTest, GroupCancelWithDis
ASSERT_TRUE(memberships.isReady());
EXPECT_EQ(0, memberships.get().size());
+
+ ASSERT_TRUE(membership.get().cancelled().isReady());
+ ASSERT_TRUE(membership.get().cancelled().get());
}
@@ -285,4 +291,78 @@ TEST_F(ZooKeeperTest, GroupWatchWithSess
ASSERT_TRUE(memberships.isReady());
EXPECT_EQ(0, memberships.get().size());
+
+ ASSERT_TRUE(membership.get().cancelled().isReady());
+ ASSERT_FALSE(membership.get().cancelled().get());
+}
+
+
+TEST_F(ZooKeeperTest, MultipleGroups)
+{
+ zookeeper::Group group1(zks->connectString(), NO_TIMEOUT, "/test/");
+ zookeeper::Group group2(zks->connectString(), NO_TIMEOUT, "/test/");
+
+ process::Future<zookeeper::Group::Membership> membership1 =
+ group1.join("group 1");
+
+ membership1.await();
+
+ ASSERT_FALSE(membership1.isFailed()) << membership1.failure();
+ ASSERT_FALSE(membership1.isDiscarded());
+ ASSERT_TRUE(membership1.isReady());
+
+ process::Future<zookeeper::Group::Membership> membership2 =
+ group2.join("group 2");
+
+ membership2.await();
+
+ ASSERT_FALSE(membership2.isFailed()) << membership2.failure();
+ ASSERT_FALSE(membership2.isDiscarded());
+ ASSERT_TRUE(membership2.isReady());
+
+ process::Future<std::set<zookeeper::Group::Membership> > memberships1 =
+ group1.watch();
+
+ memberships1.await();
+
+ ASSERT_TRUE(memberships1.isReady());
+ EXPECT_EQ(2, memberships1.get().size());
+ EXPECT_EQ(1, memberships1.get().count(membership1.get()));
+ EXPECT_EQ(1, memberships1.get().count(membership2.get()));
+
+ process::Future<std::set<zookeeper::Group::Membership> > memberships2 =
+ group2.watch();
+
+ memberships2.await();
+
+ ASSERT_TRUE(memberships2.isReady());
+ EXPECT_EQ(2, memberships2.get().size());
+ EXPECT_EQ(1, memberships2.get().count(membership1.get()));
+ EXPECT_EQ(1, memberships2.get().count(membership2.get()));
+
+ process::Future<bool> cancelled;
+
+ // Now watch the membership owned by group1 from group2.
+ foreach (const zookeeper::Group::Membership& membership, memberships2.get()) {
+ if (membership == membership1.get()) {
+ cancelled = membership.cancelled();
+ break;
+ }
+ }
+
+ process::Future<Option<int64_t> > session1 = group1.session();
+
+ session1.await();
+
+ ASSERT_FALSE(session1.isFailed()) << session1.failure();
+ ASSERT_FALSE(session1.isDiscarded());
+ ASSERT_TRUE(session1.isReady());
+ ASSERT_TRUE(session1.get().isSome());
+
+ zks->expireSession(session1.get().get());
+
+ cancelled.await();
+
+ ASSERT_TRUE(cancelled.isReady());
+ ASSERT_FALSE(cancelled.get());
}
Modified: incubator/mesos/trunk/src/zookeeper/group.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.cpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.cpp Mon May 14 06:15:20 2012
@@ -48,9 +48,9 @@ public:
void initialize();
// Group implementation.
- Future<Group::Membership> join(const string& info);
+ Future<Group::Membership> join(const string& data);
Future<bool> cancel(const Group::Membership& membership);
- Future<string> info(const Group::Membership& membership);
+ Future<string> data(const Group::Membership& membership);
Future<set<Group::Membership> > watch(
const set<Group::Membership>& expected);
Future<Option<int64_t> > session();
@@ -64,9 +64,9 @@ public:
void deleted(const string& path);
private:
- Result<Group::Membership> doJoin(const string& info);
+ Result<Group::Membership> doJoin(const string& data);
Result<bool> doCancel(const Group::Membership& membership);
- Result<string> doInfo(const Group::Membership& membership);
+ Result<string> doData(const Group::Membership& membership);
// Attempts to cache the current set of memberships.
bool cache();
@@ -108,8 +108,8 @@ private:
struct Join
{
- Join(const string& _info) : info(_info) {}
- string info;
+ Join(const string& _data) : data(_data) {}
+ string data;
Promise<Group::Membership> promise;
};
@@ -121,9 +121,9 @@ private:
Promise<bool> promise;
};
- struct Info
+ struct Data
{
- Info(const Group::Membership& _membership)
+ Data(const Group::Membership& _membership)
: membership(_membership) {}
Group::Membership membership;
Promise<string> promise;
@@ -140,18 +140,37 @@ private:
struct {
queue<Join*> joins;
queue<Cancel*> cancels;
- queue<Info*> infos;
+ queue<Data*> datas;
queue<Watch*> watches;
} pending;
bool retrying;
- map<Group::Membership, string> owned;
-
- Option<set<Group::Membership> > memberships; // The cache.
+ // Expected ZooKeeper sequence numbers (either owned/created by this
+ // group instance or not) and the promise we associate with their
+ // "cancellation" (i.e., no longer part of the group).
+ map<uint64_t, Promise<bool>*> owned;
+ map<uint64_t, Promise<bool>*> unowned;
+
+ // Cache of owned + unowned, where 'None' represents an invalid
+ // cache and 'Some' represents a valid cache.
+ Option<set<Group::Membership> > memberships;
};
+// Helper for failing a queue of promises.
+template <typename T>
+void fail(queue<T*>* queue, const string& message)
+{
+ while (!queue->empty()) {
+ T* t = queue->front();
+ queue->pop();
+ t->promise.fail(message);
+ delete t;
+ }
+}
+
+
GroupProcess::GroupProcess(
const string& _servers,
const seconds& _timeout,
@@ -171,6 +190,11 @@ GroupProcess::GroupProcess(
GroupProcess::~GroupProcess()
{
+ fail(&pending.joins, "No longer watching group");
+ fail(&pending.cancels, "No longer watching group");
+ fail(&pending.datas, "No longer watching group");
+ fail(&pending.watches, "No longer watching group");
+
delete zk;
delete watcher;
}
@@ -186,14 +210,14 @@ void GroupProcess::initialize()
}
-Future<Group::Membership> GroupProcess::join(const string& info)
+Future<Group::Membership> GroupProcess::join(const string& data)
{
if (error.isSome()) {
Promise<Group::Membership> promise;
promise.fail(error.get());
return promise.future();
} else if (state != CONNECTED) {
- Join* join = new Join(info);
+ Join* join = new Join(data);
pending.joins.push(join);
return join->promise.future();
}
@@ -206,14 +230,14 @@ Future<Group::Membership> GroupProcess::
// client can assume a happens-before ordering of operations (i.e.,
// the first request will happen before the second, etc).
- Result<Group::Membership> membership = doJoin(info);
+ Result<Group::Membership> membership = doJoin(data);
if (membership.isNone()) { // Try again later.
if (!retrying) {
delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
retrying = true;
}
- Join* join = new Join(info);
+ Join* join = new Join(data);
pending.joins.push(join);
return join->promise.future();
} else if (membership.isError()) {
@@ -222,8 +246,6 @@ Future<Group::Membership> GroupProcess::
return promise.future();
}
- owned.insert(make_pair(membership.get(), info));
-
return membership.get();
}
@@ -234,8 +256,13 @@ Future<bool> GroupProcess::cancel(const
Promise<bool> promise;
promise.fail(error.get());
return promise.future();
- } else if (owned.count(membership) == 0) {
- return false; // TODO(benh): Should this be an error?
+ } else if (owned.count(membership.id()) == 0) {
+ // TODO(benh): Should this be an error? Right now a user can't
+ // differentiate when 'false' means they can't cancel because it's
+ // not owned or because it's already been cancelled (explicitly by
+ // them or implicitly due to session expiration or operator
+ // error).
+ return false;
}
if (state != CONNECTED) {
@@ -268,28 +295,28 @@ Future<bool> GroupProcess::cancel(const
}
-Future<string> GroupProcess::info(const Group::Membership& membership)
+Future<string> GroupProcess::data(const Group::Membership& membership)
{
if (error.isSome()) {
Promise<string> promise;
promise.fail(error.get());
return promise.future();
} else if (state != CONNECTED) {
- Info* info = new Info(membership);
- pending.infos.push(info);
- return info->promise.future();
+ Data* data = new Data(membership);
+ pending.datas.push(data);
+ return data->promise.future();
}
// TODO(benh): Only attempt if the pending queue is empty so that a
// client can assume a happens-before ordering of operations (i.e.,
// the first request will happen before the second, etc).
- Result<string> result = doInfo(membership);
+ Result<string> result = doData(membership);
if (result.isNone()) { // Try again later.
- Info* info = new Info(membership);
- pending.infos.push(info);
- return info->promise.future();
+ Data* data = new Data(membership);
+ pending.datas.push(data);
+ return data->promise.future();
} else if (result.isError()) {
Promise<string> promise;
promise.fail(result.error());
@@ -423,11 +450,28 @@ void GroupProcess::reconnecting()
void GroupProcess::expired()
{
+ // Invalidate the cache.
memberships = Option<set<Group::Membership> >::none();
- owned.clear();
+
+ // Set all owned memberships as cancelled.
+ foreachpair (uint64_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
+ cancelled->set(false); // Since this was not requested.
+ owned.erase(sequence); // Okay since iterating over a copy.
+ delete cancelled;
+ }
+
+ CHECK(owned.empty());
+
+ // Note that we DO NOT clear unowned. The next time we try and cache
+ // the memberships we'll trigger any cancelled unowned memberships
+ // then. We could imagine doing this for owned memberships too, but
+ // for now we proactively cancel them above.
+
state = DISCONNECTED;
+
delete zk;
zk = new ZooKeeper(servers, timeout, watcher);
+
state = CONNECTING;
}
@@ -461,16 +505,16 @@ void GroupProcess::deleted(const string&
}
-Result<Group::Membership> GroupProcess::doJoin(const string& info)
+Result<Group::Membership> GroupProcess::doJoin(const string& data)
{
CHECK(error.isNone()) << ": " << error.get();
CHECK(state == CONNECTED);
// Create a new ephemeral node to represent a new member and use the
- // the specified info as it's contents.
+ // the specified data as it's contents.
string result;
- int code = zk->create(znode + "/", info, acl,
+ int code = zk->create(znode + "/", data, acl,
ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
@@ -485,7 +529,8 @@ Result<Group::Membership> GroupProcess::
: "Failed to create ephemeral node in ZooKeeper");
}
- // Invalidate the cache.
+ // Invalidate the cache (it will/should get immediately populated
+ // via the 'updated' callback of our ZooKeeper watcher).
memberships = Option<set<Group::Membership> >::none();
// Save the sequence number but only grab the basename. Example:
@@ -495,9 +540,10 @@ Result<Group::Membership> GroupProcess::
Try<uint64_t> sequence = utils::numify<uint64_t>(result);
CHECK(sequence.isSome()) << sequence.error();
- Group::Membership membership(sequence.get());
+ Promise<bool>* cancelled = new Promise<bool>();
+ owned[sequence.get()] = cancelled;
- return membership;
+ return Group::Membership(sequence.get(), cancelled->future());
}
@@ -529,16 +575,22 @@ Result<bool> GroupProcess::doCancel(cons
: "Failed to remove ephemeral node in ZooKeeper");
}
- // Invalidate the cache.
+ // Invalidate the cache (it will/should get immediately populated
+ // via the 'updated' callback of our ZooKeeper watcher).
memberships = Option<set<Group::Membership> >::none();
- owned.erase(membership);
+ // Let anyone waiting know the membership has been cancelled.
+ CHECK(owned.count(membership.id()) == 1);
+ Promise<bool>* cancelled = owned[membership.id()];
+ cancelled->set(true);
+ owned.erase(membership.id());
+ delete cancelled;
return true;
}
-Result<string> GroupProcess::doInfo(const Group::Membership& membership)
+Result<string> GroupProcess::doData(const Group::Membership& membership)
{
CHECK(error.isNone()) << ": " << error.get();
CHECK(state == CONNECTED);
@@ -574,7 +626,7 @@ Result<string> GroupProcess::doInfo(cons
bool GroupProcess::cache()
{
- // Invalidate first.
+ // Invalidate first (if it's not already).
memberships = Option<set<Group::Membership> >::none();
// Get all children to determine current memberships.
@@ -596,8 +648,8 @@ bool GroupProcess::cache()
return false;
}
- // Convert results to memberships.
- set<Group::Membership> current;
+ // Convert results to sequence numbers.
+ set<uint64_t> sequences;
foreach (const string& result, results) {
Try<uint64_t> sequence = utils::numify<uint64_t>(result);
@@ -609,7 +661,40 @@ bool GroupProcess::cache()
continue;
}
- current.insert(Group::Membership(sequence.get()));
+ sequences.insert(sequence.get());
+ }
+
+ // Cache current memberships, cancelling those that are now missing.
+ set<Group::Membership> current;
+
+ foreachpair (uint64_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
+ if (sequences.count(sequence) == 0) {
+ cancelled->set(false);
+ owned.erase(sequence); // Okay since iterating over a copy.
+ delete cancelled;
+ } else {
+ current.insert(Group::Membership(sequence, cancelled->future()));
+ sequences.erase(sequence);
+ }
+ }
+
+ foreachpair (uint64_t sequence, Promise<bool>* cancelled, utils::copy(unowned)) {
+ Promise<bool>* cancelled = unowned[sequence];
+ if (sequences.count(sequence) == 0) {
+ cancelled->set(false);
+ unowned.erase(sequence); // Okay since iterating over a copy.
+ delete cancelled;
+ } else {
+ current.insert(Group::Membership(sequence, cancelled->future()));
+ sequences.erase(sequence);
+ }
+ }
+
+ // Add any remaining (i.e., unexpected) sequences.
+ foreach (uint64_t sequence, sequences) {
+ Promise<bool>* cancelled = new Promise<bool>();
+ unowned[sequence] = cancelled;
+ current.insert(Group::Membership(sequence, cancelled->future()));
}
memberships = current;
@@ -621,16 +706,18 @@ bool GroupProcess::cache()
void GroupProcess::update()
{
CHECK(memberships.isSome());
- size_t size = pending.watches.size();
- for (int i = 0; i < size; i++) {
- if (memberships.get() != pending.watches.front()->expected) {
- pending.watches.front()->promise.set(memberships.get());
+ const size_t size = pending.watches.size();
+ for (size_t i = 0; i < size; i++) {
+ Watch* watch = pending.watches.front();
+ if (memberships.get() != watch->expected) {
+ watch->promise.set(memberships.get());
+ pending.watches.pop();
+ delete watch;
} else {
- pending.watches.push(pending.watches.front());
+ // Don't delete the watch, but push it to the back of the queue.
+ pending.watches.push(watch);
+ pending.watches.pop();
}
- Watch* watch = pending.watches.front();
- pending.watches.pop();
- delete watch;
}
}
@@ -642,52 +729,56 @@ bool GroupProcess::sync()
// Do joins.
while (!pending.joins.empty()) {
- Result<Group::Membership> membership = doJoin(pending.joins.front()->info);
+ Join* join = pending.joins.front();
+ Result<Group::Membership> membership = doJoin(join->data);
if (membership.isNone()) {
return false; // Try again later.
} else if (membership.isError()) {
- pending.joins.front()->promise.fail(membership.error());
+ join->promise.fail(membership.error());
} else {
- owned.insert(make_pair(membership.get(), pending.joins.front()->info));
- pending.joins.front()->promise.set(membership.get());
+ join->promise.set(membership.get());
}
- Join* join = pending.joins.front();
pending.joins.pop();
delete join;
}
// Do cancels.
while (!pending.cancels.empty()) {
- Result<bool> cancellation = doCancel(pending.cancels.front()->membership);
+ Cancel* cancel = pending.cancels.front();
+ Result<bool> cancellation = doCancel(cancel->membership);
if (cancellation.isNone()) {
return false; // Try again later.
} else if (cancellation.isError()) {
- pending.cancels.front()->promise.fail(cancellation.error());
+ cancel->promise.fail(cancellation.error());
} else {
- pending.cancels.front()->promise.set(cancellation.get());
+ cancel->promise.set(cancellation.get());
}
- Cancel* cancel = pending.cancels.front();
pending.cancels.pop();
delete cancel;
}
- // Do infos.
- while (!pending.infos.empty()) {
+ // Do datas.
+ while (!pending.datas.empty()) {
+ Data* data = pending.datas.front();
// TODO(benh): Ignore if future has been discarded?
- Result<string> result = doInfo(pending.infos.front()->membership);
+ Result<string> result = doData(data->membership);
if (result.isNone()) {
return false; // Try again later.
} else if (result.isError()) {
- pending.infos.front()->promise.fail(result.error());
+ data->promise.fail(result.error());
} else {
- pending.infos.front()->promise.set(result.get());
+ data->promise.set(result.get());
}
- Info* info = pending.infos.front();
- pending.infos.pop();
- delete info;
+ pending.datas.pop();
+ delete data;
}
- // Get cache of memberships if we don't have one.
+ // Get cache of memberships if we don't have one. Note that we do
+ // this last because any joins or cancels above will invalidate our
+ // cache, so it would be nice to get it validated again at the
+ // end. The side-effect here is that users will learn of joins and
+ // cancels first through any explicit futures for them rather than
+ // watches.
if (memberships.isNone()) {
if (!cache()) {
return false; // Try again later (if no error).
@@ -716,25 +807,13 @@ void GroupProcess::retry(double seconds)
}
-template <typename T>
-void fail(queue<T*>* queue, const string& message)
-{
- while (!queue->empty()) {
- T* t = queue->front();
- queue->pop();
- t->promise.fail(message);
- delete t;
- }
-}
-
-
void GroupProcess::abort()
{
CHECK(error.isSome());
fail(&pending.joins, error.get());
fail(&pending.cancels, error.get());
- fail(&pending.infos, error.get());
+ fail(&pending.datas, error.get());
fail(&pending.watches, error.get());
}
@@ -758,9 +837,9 @@ Group::~Group()
}
-Future<Group::Membership> Group::join(const string& info)
+Future<Group::Membership> Group::join(const string& data)
{
- return dispatch(process, &GroupProcess::join, info);
+ return dispatch(process, &GroupProcess::join, data);
}
@@ -770,9 +849,9 @@ Future<bool> Group::cancel(const Group::
}
-Future<string> Group::info(const Group::Membership& membership)
+Future<string> Group::data(const Group::Membership& membership)
{
- return dispatch(process, &GroupProcess::info, membership);
+ return dispatch(process, &GroupProcess::data, membership);
}
Modified: incubator/mesos/trunk/src/zookeeper/group.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.hpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.hpp Mon May 14 06:15:20 2012
@@ -23,7 +23,9 @@ class Group
public:
// Represents a group membership. Note that we order memberships by
// membership id (that is, an older membership is ordered before a
- // younger membership).
+ // younger membership). In addition, we do not use the "cancelled"
+ // future to compare memberships so that two memberships created
+ // from different Group instances will still be considered the same.
struct Membership
{
bool operator == (const Membership& that) const
@@ -56,12 +58,25 @@ public:
return sequence;
}
+ // Returns a future that is only satisfied once this membership
+ // has been cancelled. In which case, the value of the future is
+ // true if you own this membership and cancelled it by invoking
+ // Group::cancel. Otherwise, the value of the future is false (and
+ // could signify cancellation due to a sesssion expiration or
+ // operator error).
+ process::Future<bool> cancelled() const
+ {
+ return cancelled_;
+ }
+
private:
friend class GroupProcess; // Creates and manages memberships.
- Membership(uint64_t _sequence) : sequence(_sequence) {}
+ Membership(uint64_t _sequence, const process::Future<bool>& cancelled)
+ : sequence(_sequence), cancelled_(cancelled) {}
uint64_t sequence;
+ process::Future<bool> cancelled_;
};
// Constructs this group using the specified ZooKeeper servers (list
@@ -74,19 +89,19 @@ public:
// Returns the result of trying to join a "group" in ZooKeeper. If
// succesful, an "owned" membership will be returned whose
- // retrievable information will be a copy of the specified
- // parameter. A membership is not "renewed" in the event of a
- // ZooKeeper session expiration. Instead, a client should watch the
- // group memberships and rejoin the group as appropriate.
- process::Future<Membership> join(const std::string& info);
+ // retrievable data will be a copy of the specified parameter. A
+ // membership is not "renewed" in the event of a ZooKeeper session
+ // expiration. Instead, a client should watch the group memberships
+ // and rejoin the group as appropriate.
+ process::Future<Membership> join(const std::string& data);
// Returns the result of trying to cancel a membership. Note that
// only memberships that are "owned" (see join) can be canceled.
process::Future<bool> cancel(const Membership& membership);
- // Returns the result of trying to fetch the information associated
- // with a group membership.
- process::Future<std::string> info(const Membership& membership);
+ // Returns the result of trying to fetch the data associated with a
+ // group membership.
+ process::Future<std::string> data(const Membership& membership);
// Returns a future that gets set when the group memberships differ
// from the "expected" memberships specified.