You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/05/14 08:15:21 UTC

svn commit: r1338037 - in /incubator/mesos/trunk/src: log/network.hpp tests/zookeeper_tests.cpp zookeeper/group.cpp zookeeper/group.hpp

Author: benh
Date: Mon May 14 06:15:20 2012
New Revision: 1338037

URL: http://svn.apache.org/viewvc?rev=1338037&view=rev
Log:
Updated Group and Group::Membership API.

(1) Renamed Group::info to Group::data (and associated changes).
(2) Added Group::Membership::watch to explicitly watch for a single
    membership's cancellation (and associated GroupProcess changes).
(3) Fixed a but in GroupProcess::update that was deleting a Watch
    object when also re-enqueueing it.
(4) Updated the GroupProcess destructor to fail all outstanding
    futures associated with the Group.

Modified:
    incubator/mesos/trunk/src/log/network.hpp
    incubator/mesos/trunk/src/tests/zookeeper_tests.cpp
    incubator/mesos/trunk/src/zookeeper/group.cpp
    incubator/mesos/trunk/src/zookeeper/group.hpp

Modified: incubator/mesos/trunk/src/log/network.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/log/network.hpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/log/network.hpp (original)
+++ incubator/mesos/trunk/src/log/network.hpp Mon May 14 06:15:20 2012
@@ -32,6 +32,7 @@
 
 #include "common/foreach.hpp"
 #include "common/lambda.hpp"
+#include "common/logging.hpp"
 #include "common/seconds.hpp"
 #include "common/utils.hpp"
 
@@ -103,7 +104,7 @@ private:
   zookeeper::Group* group;
   process::Executor executor;
   process::Future<std::set<zookeeper::Group::Membership> > memberships;
-  process::Future<std::list<std::string> > infos;
+  process::Future<std::list<std::string> > datas;
 };
 
 
@@ -276,19 +277,19 @@ inline void ZooKeeperNetwork::watched()
   std::list<process::Future<std::string> > futures;
 
   foreach (const zookeeper::Group::Membership& membership, memberships.get()) {
-    futures.push_back(group->info(membership));
+    futures.push_back(group->data(membership));
   }
 
-  infos = process::collect(futures, process::Timeout(5.0));
-  infos.onAny(executor.defer(lambda::bind(&This::collected, this)));
+  datas = process::collect(futures, process::Timeout(5.0));
+  datas.onAny(executor.defer(lambda::bind(&This::collected, this)));
 }
 
 
 inline void ZooKeeperNetwork::collected()
 {
-  if (infos.isFailed()) {
+  if (datas.isFailed()) {
     LOG(WARNING) << "Failed to get data for ZooKeeper group members: "
-                 << infos.failure();
+                 << datas.failure();
 
     // Try again later assuming empty group. Note that this does not
     // remove any of the current group members.
@@ -296,13 +297,13 @@ inline void ZooKeeperNetwork::collected(
     return;
   }
 
-  CHECK(infos.isReady()); // Not expecting collect to discard futures.
+  CHECK(datas.isReady()); // Not expecting collect to discard futures.
 
   std::set<process::UPID> pids;
 
-  foreach (const std::string& info, infos.get()) {
-    process::UPID pid(info);
-    CHECK(pid) << "Failed to parse '" << info << "'";
+  foreach (const std::string& data, datas.get()) {
+    process::UPID pid(data);
+    CHECK(pid) << "Failed to parse '" << data << "'";
     pids.insert(pid);
   }
 

Modified: incubator/mesos/trunk/src/tests/zookeeper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/zookeeper_tests.cpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/zookeeper_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/zookeeper_tests.cpp Mon May 14 06:15:20 2012
@@ -93,14 +93,14 @@ TEST_F(ZooKeeperTest, Group)
   EXPECT_EQ(1, memberships.get().size());
   EXPECT_EQ(1, memberships.get().count(membership.get()));
 
-  process::Future<std::string> info = group.info(membership.get());
+  process::Future<std::string> data = group.data(membership.get());
 
-  info.await();
+  data.await();
 
-  ASSERT_FALSE(info.isFailed()) << info.failure();
-  ASSERT_FALSE(info.isDiscarded());
-  ASSERT_TRUE(info.isReady());
-  EXPECT_EQ("hello world", info.get());
+  ASSERT_FALSE(data.isFailed()) << data.failure();
+  ASSERT_FALSE(data.isDiscarded());
+  ASSERT_TRUE(data.isReady());
+  EXPECT_EQ("hello world", data.get());
 
   process::Future<bool> cancellation = group.cancel(membership.get());
 
@@ -117,6 +117,9 @@ TEST_F(ZooKeeperTest, Group)
 
   ASSERT_TRUE(memberships.isReady());
   EXPECT_EQ(0, memberships.get().size());
+
+  ASSERT_TRUE(membership.get().cancelled().isReady());
+  ASSERT_TRUE(membership.get().cancelled().get());
 }
 
 
@@ -150,7 +153,7 @@ TEST_F(ZooKeeperTest, GroupJoinWithDisco
 }
 
 
-TEST_F(ZooKeeperTest, GroupInfoWithDisconnect)
+TEST_F(ZooKeeperTest, GroupDataWithDisconnect)
 {
   zookeeper::Group group(zks->connectString(), NO_TIMEOUT, "/test/");
 
@@ -174,18 +177,18 @@ TEST_F(ZooKeeperTest, GroupInfoWithDisco
 
   zks->shutdownNetwork();
 
-  process::Future<std::string> info = group.info(membership.get());
+  process::Future<std::string> data = group.data(membership.get());
 
-  EXPECT_TRUE(info.isPending());
+  EXPECT_TRUE(data.isPending());
 
   zks->startNetwork();
 
-  info.await();
+  data.await();
 
-  ASSERT_FALSE(info.isFailed()) << info.failure();
-  ASSERT_FALSE(info.isDiscarded());
-  ASSERT_TRUE(info.isReady());
-  EXPECT_EQ("hello world", info.get());
+  ASSERT_FALSE(data.isFailed()) << data.failure();
+  ASSERT_FALSE(data.isDiscarded());
+  ASSERT_TRUE(data.isReady());
+  EXPECT_EQ("hello world", data.get());
 }
 
 
@@ -211,16 +214,16 @@ TEST_F(ZooKeeperTest, GroupCancelWithDis
   EXPECT_EQ(1, memberships.get().size());
   EXPECT_EQ(1, memberships.get().count(membership.get()));
 
-  process::Future<std::string> info = group.info(membership.get());
+  process::Future<std::string> data = group.data(membership.get());
 
-  EXPECT_TRUE(info.isPending());
+  EXPECT_TRUE(data.isPending());
 
-  info.await();
+  data.await();
 
-  ASSERT_FALSE(info.isFailed()) << info.failure();
-  ASSERT_FALSE(info.isDiscarded());
-  ASSERT_TRUE(info.isReady());
-  EXPECT_EQ("hello world", info.get());
+  ASSERT_FALSE(data.isFailed()) << data.failure();
+  ASSERT_FALSE(data.isDiscarded());
+  ASSERT_TRUE(data.isReady());
+  EXPECT_EQ("hello world", data.get());
 
   zks->shutdownNetwork();
 
@@ -243,6 +246,9 @@ TEST_F(ZooKeeperTest, GroupCancelWithDis
 
   ASSERT_TRUE(memberships.isReady());
   EXPECT_EQ(0, memberships.get().size());
+
+  ASSERT_TRUE(membership.get().cancelled().isReady());
+  ASSERT_TRUE(membership.get().cancelled().get());
 }
 
 
@@ -285,4 +291,78 @@ TEST_F(ZooKeeperTest, GroupWatchWithSess
 
   ASSERT_TRUE(memberships.isReady());
   EXPECT_EQ(0, memberships.get().size());
+
+  ASSERT_TRUE(membership.get().cancelled().isReady());
+  ASSERT_FALSE(membership.get().cancelled().get());
+}
+
+
+TEST_F(ZooKeeperTest, MultipleGroups)
+{
+  zookeeper::Group group1(zks->connectString(), NO_TIMEOUT, "/test/");
+  zookeeper::Group group2(zks->connectString(), NO_TIMEOUT, "/test/");
+
+  process::Future<zookeeper::Group::Membership> membership1 =
+    group1.join("group 1");
+
+  membership1.await();
+
+  ASSERT_FALSE(membership1.isFailed()) << membership1.failure();
+  ASSERT_FALSE(membership1.isDiscarded());
+  ASSERT_TRUE(membership1.isReady());
+
+  process::Future<zookeeper::Group::Membership> membership2 =
+    group2.join("group 2");
+
+  membership2.await();
+
+  ASSERT_FALSE(membership2.isFailed()) << membership2.failure();
+  ASSERT_FALSE(membership2.isDiscarded());
+  ASSERT_TRUE(membership2.isReady());
+
+  process::Future<std::set<zookeeper::Group::Membership> > memberships1 =
+    group1.watch();
+
+  memberships1.await();
+
+  ASSERT_TRUE(memberships1.isReady());
+  EXPECT_EQ(2, memberships1.get().size());
+  EXPECT_EQ(1, memberships1.get().count(membership1.get()));
+  EXPECT_EQ(1, memberships1.get().count(membership2.get()));
+
+  process::Future<std::set<zookeeper::Group::Membership> > memberships2 =
+    group2.watch();
+
+  memberships2.await();
+
+  ASSERT_TRUE(memberships2.isReady());
+  EXPECT_EQ(2, memberships2.get().size());
+  EXPECT_EQ(1, memberships2.get().count(membership1.get()));
+  EXPECT_EQ(1, memberships2.get().count(membership2.get()));
+
+  process::Future<bool> cancelled;
+
+  // Now watch the membership owned by group1 from group2.
+  foreach (const zookeeper::Group::Membership& membership, memberships2.get()) {
+    if (membership == membership1.get()) {
+      cancelled = membership.cancelled();
+      break;
+    }
+  }
+
+  process::Future<Option<int64_t> > session1 = group1.session();
+
+  session1.await();
+
+  ASSERT_FALSE(session1.isFailed()) << session1.failure();
+  ASSERT_FALSE(session1.isDiscarded());
+  ASSERT_TRUE(session1.isReady());
+  ASSERT_TRUE(session1.get().isSome());
+
+  zks->expireSession(session1.get().get());
+
+  cancelled.await();
+
+  ASSERT_TRUE(cancelled.isReady());
+  ASSERT_FALSE(cancelled.get());
 }

Modified: incubator/mesos/trunk/src/zookeeper/group.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.cpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.cpp Mon May 14 06:15:20 2012
@@ -48,9 +48,9 @@ public:
   void initialize();
 
   // Group implementation.
-  Future<Group::Membership> join(const string& info);
+  Future<Group::Membership> join(const string& data);
   Future<bool> cancel(const Group::Membership& membership);
-  Future<string> info(const Group::Membership& membership);
+  Future<string> data(const Group::Membership& membership);
   Future<set<Group::Membership> > watch(
       const set<Group::Membership>& expected);
   Future<Option<int64_t> > session();
@@ -64,9 +64,9 @@ public:
   void deleted(const string& path);
 
 private:
-  Result<Group::Membership> doJoin(const string& info);
+  Result<Group::Membership> doJoin(const string& data);
   Result<bool> doCancel(const Group::Membership& membership);
-  Result<string> doInfo(const Group::Membership& membership);
+  Result<string> doData(const Group::Membership& membership);
 
   // Attempts to cache the current set of memberships.
   bool cache();
@@ -108,8 +108,8 @@ private:
 
   struct Join
   {
-    Join(const string& _info) : info(_info) {}
-    string info;
+    Join(const string& _data) : data(_data) {}
+    string data;
     Promise<Group::Membership> promise;
   };
 
@@ -121,9 +121,9 @@ private:
     Promise<bool> promise;
   };
 
-  struct Info
+  struct Data
   {
-    Info(const Group::Membership& _membership)
+    Data(const Group::Membership& _membership)
       : membership(_membership) {}
     Group::Membership membership;
     Promise<string> promise;
@@ -140,18 +140,37 @@ private:
   struct {
     queue<Join*> joins;
     queue<Cancel*> cancels;
-    queue<Info*> infos;
+    queue<Data*> datas;
     queue<Watch*> watches;
   } pending;
 
   bool retrying;
 
-  map<Group::Membership, string> owned;
-
-  Option<set<Group::Membership> > memberships; // The cache.
+  // Expected ZooKeeper sequence numbers (either owned/created by this
+  // group instance or not) and the promise we associate with their
+  // "cancellation" (i.e., no longer part of the group).
+  map<uint64_t, Promise<bool>*> owned;
+  map<uint64_t, Promise<bool>*> unowned;
+
+  // Cache of owned + unowned, where 'None' represents an invalid
+  // cache and 'Some' represents a valid cache.
+  Option<set<Group::Membership> > memberships;
 };
 
 
+// Helper for failing a queue of promises.
+template <typename T>
+void fail(queue<T*>* queue, const string& message)
+{
+  while (!queue->empty()) {
+    T* t = queue->front();
+    queue->pop();
+    t->promise.fail(message);
+    delete t;
+  }
+}
+
+
 GroupProcess::GroupProcess(
     const string& _servers,
     const seconds& _timeout,
@@ -171,6 +190,11 @@ GroupProcess::GroupProcess(
 
 GroupProcess::~GroupProcess()
 {
+  fail(&pending.joins, "No longer watching group");
+  fail(&pending.cancels, "No longer watching group");
+  fail(&pending.datas, "No longer watching group");
+  fail(&pending.watches, "No longer watching group");
+
   delete zk;
   delete watcher;
 }
@@ -186,14 +210,14 @@ void GroupProcess::initialize()
 }
 
 
-Future<Group::Membership> GroupProcess::join(const string& info)
+Future<Group::Membership> GroupProcess::join(const string& data)
 {
   if (error.isSome()) {
     Promise<Group::Membership> promise;
     promise.fail(error.get());
     return promise.future();
   } else if (state != CONNECTED) {
-    Join* join = new Join(info);
+    Join* join = new Join(data);
     pending.joins.push(join);
     return join->promise.future();
   }
@@ -206,14 +230,14 @@ Future<Group::Membership> GroupProcess::
   // client can assume a happens-before ordering of operations (i.e.,
   // the first request will happen before the second, etc).
 
-  Result<Group::Membership> membership = doJoin(info);
+  Result<Group::Membership> membership = doJoin(data);
 
   if (membership.isNone()) { // Try again later.
     if (!retrying) {
       delay(RETRY_SECONDS, self(), &GroupProcess::retry, RETRY_SECONDS);
       retrying = true;
     }
-    Join* join = new Join(info);
+    Join* join = new Join(data);
     pending.joins.push(join);
     return join->promise.future();
   } else if (membership.isError()) {
@@ -222,8 +246,6 @@ Future<Group::Membership> GroupProcess::
     return promise.future();
   }
 
-  owned.insert(make_pair(membership.get(), info));
-
   return membership.get();
 }
 
@@ -234,8 +256,13 @@ Future<bool> GroupProcess::cancel(const 
     Promise<bool> promise;
     promise.fail(error.get());
     return promise.future();
-  } else if (owned.count(membership) == 0) {
-    return false; // TODO(benh): Should this be an error?
+  } else if (owned.count(membership.id()) == 0) {
+    // TODO(benh): Should this be an error? Right now a user can't
+    // differentiate when 'false' means they can't cancel because it's
+    // not owned or because it's already been cancelled (explicitly by
+    // them or implicitly due to session expiration or operator
+    // error).
+    return false;
   }
 
   if (state != CONNECTED) {
@@ -268,28 +295,28 @@ Future<bool> GroupProcess::cancel(const 
 }
 
 
-Future<string> GroupProcess::info(const Group::Membership& membership)
+Future<string> GroupProcess::data(const Group::Membership& membership)
 {
   if (error.isSome()) {
     Promise<string> promise;
     promise.fail(error.get());
     return promise.future();
   } else if (state != CONNECTED) {
-    Info* info = new Info(membership);
-    pending.infos.push(info);
-    return info->promise.future();
+    Data* data = new Data(membership);
+    pending.datas.push(data);
+    return data->promise.future();
   }
 
   // TODO(benh): Only attempt if the pending queue is empty so that a
   // client can assume a happens-before ordering of operations (i.e.,
   // the first request will happen before the second, etc).
 
-  Result<string> result = doInfo(membership);
+  Result<string> result = doData(membership);
 
   if (result.isNone()) { // Try again later.
-    Info* info = new Info(membership);
-    pending.infos.push(info);
-    return info->promise.future();
+    Data* data = new Data(membership);
+    pending.datas.push(data);
+    return data->promise.future();
   } else if (result.isError()) {
     Promise<string> promise;
     promise.fail(result.error());
@@ -423,11 +450,28 @@ void GroupProcess::reconnecting()
 
 void GroupProcess::expired()
 {
+  // Invalidate the cache.
   memberships = Option<set<Group::Membership> >::none();
-  owned.clear();
+
+  // Set all owned memberships as cancelled.
+  foreachpair (uint64_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
+    cancelled->set(false); // Since this was not requested.
+    owned.erase(sequence); // Okay since iterating over a copy.
+    delete cancelled;
+  }
+
+  CHECK(owned.empty());
+
+  // Note that we DO NOT clear unowned. The next time we try and cache
+  // the memberships we'll trigger any cancelled unowned memberships
+  // then. We could imagine doing this for owned memberships too, but
+  // for now we proactively cancel them above.
+
   state = DISCONNECTED;
+
   delete zk;
   zk = new ZooKeeper(servers, timeout, watcher);
+
   state = CONNECTING;
 }
 
@@ -461,16 +505,16 @@ void GroupProcess::deleted(const string&
 }
 
 
-Result<Group::Membership> GroupProcess::doJoin(const string& info)
+Result<Group::Membership> GroupProcess::doJoin(const string& data)
 {
   CHECK(error.isNone()) << ": " << error.get();
   CHECK(state == CONNECTED);
 
   // Create a new ephemeral node to represent a new member and use the
-  // the specified info as it's contents.
+  // the specified data as it's contents.
   string result;
 
-  int code = zk->create(znode + "/", info, acl,
+  int code = zk->create(znode + "/", data, acl,
                         ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
@@ -485,7 +529,8 @@ Result<Group::Membership> GroupProcess::
         : "Failed to create ephemeral node in ZooKeeper");
   }
 
-  // Invalidate the cache.
+  // Invalidate the cache (it will/should get immediately populated
+  // via the 'updated' callback of our ZooKeeper watcher).
   memberships = Option<set<Group::Membership> >::none();
 
   // Save the sequence number but only grab the basename. Example:
@@ -495,9 +540,10 @@ Result<Group::Membership> GroupProcess::
   Try<uint64_t> sequence = utils::numify<uint64_t>(result);
   CHECK(sequence.isSome()) << sequence.error();
 
-  Group::Membership membership(sequence.get());
+  Promise<bool>* cancelled = new Promise<bool>();
+  owned[sequence.get()] = cancelled;
 
-  return membership;
+  return Group::Membership(sequence.get(), cancelled->future());
 }
 
 
@@ -529,16 +575,22 @@ Result<bool> GroupProcess::doCancel(cons
         : "Failed to remove ephemeral node in ZooKeeper");
   }
 
-  // Invalidate the cache.
+  // Invalidate the cache (it will/should get immediately populated
+  // via the 'updated' callback of our ZooKeeper watcher).
   memberships = Option<set<Group::Membership> >::none();
 
-  owned.erase(membership);
+  // Let anyone waiting know the membership has been cancelled.
+  CHECK(owned.count(membership.id()) == 1);
+  Promise<bool>* cancelled = owned[membership.id()];
+  cancelled->set(true);
+  owned.erase(membership.id());
+  delete cancelled;
 
   return true;
 }
 
 
-Result<string> GroupProcess::doInfo(const Group::Membership& membership)
+Result<string> GroupProcess::doData(const Group::Membership& membership)
 {
   CHECK(error.isNone()) << ": " << error.get();
   CHECK(state == CONNECTED);
@@ -574,7 +626,7 @@ Result<string> GroupProcess::doInfo(cons
 
 bool GroupProcess::cache()
 {
-  // Invalidate first.
+  // Invalidate first (if it's not already).
   memberships = Option<set<Group::Membership> >::none();
 
   // Get all children to determine current memberships.
@@ -596,8 +648,8 @@ bool GroupProcess::cache()
     return false;
   }
 
-  // Convert results to memberships.
-  set<Group::Membership> current;
+  // Convert results to sequence numbers.
+  set<uint64_t> sequences;
 
   foreach (const string& result, results) {
     Try<uint64_t> sequence = utils::numify<uint64_t>(result);
@@ -609,7 +661,40 @@ bool GroupProcess::cache()
       continue;
     }
 
-    current.insert(Group::Membership(sequence.get()));
+    sequences.insert(sequence.get());
+  }
+
+  // Cache current memberships, cancelling those that are now missing.
+  set<Group::Membership> current;
+
+  foreachpair (uint64_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
+    if (sequences.count(sequence) == 0) {
+      cancelled->set(false);
+      owned.erase(sequence); // Okay since iterating over a copy.
+      delete cancelled;
+    } else {
+      current.insert(Group::Membership(sequence, cancelled->future()));
+      sequences.erase(sequence);
+    }
+  }
+
+  foreachpair (uint64_t sequence, Promise<bool>* cancelled, utils::copy(unowned)) {
+    Promise<bool>* cancelled = unowned[sequence];
+    if (sequences.count(sequence) == 0) {
+      cancelled->set(false);
+      unowned.erase(sequence); // Okay since iterating over a copy.
+      delete cancelled;
+    } else {
+      current.insert(Group::Membership(sequence, cancelled->future()));
+      sequences.erase(sequence);
+    }
+  }
+
+  // Add any remaining (i.e., unexpected) sequences.
+  foreach (uint64_t sequence, sequences) {
+    Promise<bool>* cancelled = new Promise<bool>();
+    unowned[sequence] = cancelled;
+    current.insert(Group::Membership(sequence, cancelled->future()));
   }
 
   memberships = current;
@@ -621,16 +706,18 @@ bool GroupProcess::cache()
 void GroupProcess::update()
 {
   CHECK(memberships.isSome());
-  size_t size = pending.watches.size();
-  for (int i = 0; i < size; i++) {
-    if (memberships.get() != pending.watches.front()->expected) {
-      pending.watches.front()->promise.set(memberships.get());
+  const size_t size = pending.watches.size();
+  for (size_t i = 0; i < size; i++) {
+    Watch* watch = pending.watches.front();
+    if (memberships.get() != watch->expected) {
+      watch->promise.set(memberships.get());
+      pending.watches.pop();
+      delete watch;
     } else {
-      pending.watches.push(pending.watches.front());
+      // Don't delete the watch, but push it to the back of the queue.
+      pending.watches.push(watch);
+      pending.watches.pop();
     }
-    Watch* watch = pending.watches.front();
-    pending.watches.pop();
-    delete watch;
   }
 }
 
@@ -642,52 +729,56 @@ bool GroupProcess::sync()
 
   // Do joins.
   while (!pending.joins.empty()) {
-    Result<Group::Membership> membership = doJoin(pending.joins.front()->info);
+    Join* join = pending.joins.front();
+    Result<Group::Membership> membership = doJoin(join->data);
     if (membership.isNone()) {
       return false; // Try again later.
     } else if (membership.isError()) {
-      pending.joins.front()->promise.fail(membership.error());
+      join->promise.fail(membership.error());
     } else {
-      owned.insert(make_pair(membership.get(), pending.joins.front()->info));
-      pending.joins.front()->promise.set(membership.get());
+      join->promise.set(membership.get());
     }
-    Join* join = pending.joins.front();
     pending.joins.pop();
     delete join;
   }
 
   // Do cancels.
   while (!pending.cancels.empty()) {
-    Result<bool> cancellation = doCancel(pending.cancels.front()->membership);
+    Cancel* cancel = pending.cancels.front();
+    Result<bool> cancellation = doCancel(cancel->membership);
     if (cancellation.isNone()) {
       return false; // Try again later.
     } else if (cancellation.isError()) {
-      pending.cancels.front()->promise.fail(cancellation.error());
+      cancel->promise.fail(cancellation.error());
     } else {
-      pending.cancels.front()->promise.set(cancellation.get());
+      cancel->promise.set(cancellation.get());
     }
-    Cancel* cancel = pending.cancels.front();
     pending.cancels.pop();
     delete cancel;
   }
 
-  // Do infos.
-  while (!pending.infos.empty()) {
+  // Do datas.
+  while (!pending.datas.empty()) {
+    Data* data = pending.datas.front();
     // TODO(benh): Ignore if future has been discarded?
-    Result<string> result = doInfo(pending.infos.front()->membership);
+    Result<string> result = doData(data->membership);
     if (result.isNone()) {
       return false; // Try again later.
     } else if (result.isError()) {
-      pending.infos.front()->promise.fail(result.error());
+      data->promise.fail(result.error());
     } else {
-      pending.infos.front()->promise.set(result.get());
+      data->promise.set(result.get());
     }
-    Info* info = pending.infos.front();
-    pending.infos.pop();
-    delete info;
+    pending.datas.pop();
+    delete data;
   }
 
-  // Get cache of memberships if we don't have one.
+  // Get cache of memberships if we don't have one. Note that we do
+  // this last because any joins or cancels above will invalidate our
+  // cache, so it would be nice to get it validated again at the
+  // end. The side-effect here is that users will learn of joins and
+  // cancels first through any explicit futures for them rather than
+  // watches.
   if (memberships.isNone()) {
     if (!cache()) {
       return false; // Try again later (if no error).
@@ -716,25 +807,13 @@ void GroupProcess::retry(double seconds)
 }
 
 
-template <typename T>
-void fail(queue<T*>* queue, const string& message)
-{
-  while (!queue->empty()) {
-    T* t = queue->front();
-    queue->pop();
-    t->promise.fail(message);
-    delete t;
-  }
-}
-
-
 void GroupProcess::abort()
 {
   CHECK(error.isSome());
 
   fail(&pending.joins, error.get());
   fail(&pending.cancels, error.get());
-  fail(&pending.infos, error.get());
+  fail(&pending.datas, error.get());
   fail(&pending.watches, error.get());
 }
 
@@ -758,9 +837,9 @@ Group::~Group()
 }
 
 
-Future<Group::Membership> Group::join(const string& info)
+Future<Group::Membership> Group::join(const string& data)
 {
-  return dispatch(process, &GroupProcess::join, info);
+  return dispatch(process, &GroupProcess::join, data);
 }
 
 
@@ -770,9 +849,9 @@ Future<bool> Group::cancel(const Group::
 }
 
 
-Future<string> Group::info(const Group::Membership& membership)
+Future<string> Group::data(const Group::Membership& membership)
 {
-  return dispatch(process, &GroupProcess::info, membership);
+  return dispatch(process, &GroupProcess::data, membership);
 }
 
 

Modified: incubator/mesos/trunk/src/zookeeper/group.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper/group.hpp?rev=1338037&r1=1338036&r2=1338037&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper/group.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper/group.hpp Mon May 14 06:15:20 2012
@@ -23,7 +23,9 @@ class Group
 public:
   // Represents a group membership. Note that we order memberships by
   // membership id (that is, an older membership is ordered before a
-  // younger membership).
+  // younger membership). In addition, we do not use the "cancelled"
+  // future to compare memberships so that two memberships created
+  // from different Group instances will still be considered the same.
   struct Membership
   {
     bool operator == (const Membership& that) const
@@ -56,12 +58,25 @@ public:
       return sequence;
     }
 
+    // Returns a future that is only satisfied once this membership
+    // has been cancelled. In which case, the value of the future is
+    // true if you own this membership and cancelled it by invoking
+    // Group::cancel. Otherwise, the value of the future is false (and
+    // could signify cancellation due to a sesssion expiration or
+    // operator error).
+    process::Future<bool> cancelled() const
+    {
+      return cancelled_;
+    }
+
   private:
     friend class GroupProcess; // Creates and manages memberships.
 
-    Membership(uint64_t _sequence) : sequence(_sequence) {}
+    Membership(uint64_t _sequence, const process::Future<bool>& cancelled)
+      : sequence(_sequence), cancelled_(cancelled) {}
 
     uint64_t sequence;
+    process::Future<bool> cancelled_;
   };
 
   // Constructs this group using the specified ZooKeeper servers (list
@@ -74,19 +89,19 @@ public:
 
   // Returns the result of trying to join a "group" in ZooKeeper. If
   // succesful, an "owned" membership will be returned whose
-  // retrievable information will be a copy of the specified
-  // parameter. A membership is not "renewed" in the event of a
-  // ZooKeeper session expiration. Instead, a client should watch the
-  // group memberships and rejoin the group as appropriate.
-  process::Future<Membership> join(const std::string& info);
+  // retrievable data will be a copy of the specified parameter. A
+  // membership is not "renewed" in the event of a ZooKeeper session
+  // expiration. Instead, a client should watch the group memberships
+  // and rejoin the group as appropriate.
+  process::Future<Membership> join(const std::string& data);
 
   // Returns the result of trying to cancel a membership. Note that
   // only memberships that are "owned" (see join) can be canceled.
   process::Future<bool> cancel(const Membership& membership);
 
-  // Returns the result of trying to fetch the information associated
-  // with a group membership.
-  process::Future<std::string> info(const Membership& membership);
+  // Returns the result of trying to fetch the data associated with a
+  // group membership.
+  process::Future<std::string> data(const Membership& membership);
 
   // Returns a future that gets set when the group memberships differ
   // from the "expected" memberships specified.