You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/01/23 00:59:45 UTC

[2/4] git commit: Improved Group to take label as an option.

Improved Group to take label as an option.

Review: https://reviews.apache.org/r/17171


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/326172ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/326172ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/326172ec

Branch: refs/heads/master
Commit: 326172ec4571de0952278f005d6cba64d4dfc120
Parents: 42946ed
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 16:18:32 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800

----------------------------------------------------------------------
 src/tests/group_tests.cpp   | 34 ++++++++++++++
 src/zookeeper/group.cpp     | 99 ++++++++++++++++++++++++++--------------
 src/zookeeper/group.hpp     | 43 ++++++++++++-----
 src/zookeeper/zookeeper.hpp | 29 ++++++------
 4 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index ac1942b..5d4240c 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -372,3 +372,37 @@ TEST_F(GroupTest, RetryableErrors)
   AWAIT_READY(cancellation);
   AWAIT_READY(connected);
 }
+
+
+TEST_F(GroupTest, LabelledGroup)
+{
+  Group group(server->connectString(), NO_TIMEOUT, "/test/");
+
+  // Join a group with label.
+  Future<Group::Membership> membership = group.join(
+      "hello world", std::string("testlabel"));
+
+  AWAIT_READY(membership);
+
+  Future<std::set<Group::Membership> > memberships = group.watch();
+
+  AWAIT_READY(memberships);
+  EXPECT_EQ(1u, memberships.get().size());
+  EXPECT_EQ(1u, memberships.get().count(membership.get()));
+
+  Future<std::string> data = group.data(membership.get());
+
+  AWAIT_EXPECT_EQ("hello world", data);
+
+  Future<bool> cancellation = group.cancel(membership.get());
+
+  AWAIT_EXPECT_EQ(true, cancellation);
+
+  memberships = group.watch(memberships.get());
+
+  AWAIT_READY(memberships);
+  EXPECT_EQ(0u, memberships.get().size());
+
+  ASSERT_TRUE(membership.get().cancelled().isReady());
+  ASSERT_TRUE(membership.get().cancelled().get());
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 72ebe69..a50da22 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -13,6 +13,7 @@
 #include <stout/none.hpp>
 #include <stout/numify.hpp>
 #include <stout/os.hpp>
+#include <stout/path.hpp>
 #include <stout/result.hpp>
 #include <stout/some.hpp>
 #include <stout/strings.hpp>
@@ -127,12 +128,14 @@ void GroupProcess::initialize()
 }
 
 
-Future<Group::Membership> GroupProcess::join(const string& data)
+Future<Group::Membership> GroupProcess::join(
+    const string& data,
+    const Option<string>& label)
 {
   if (error.isSome()) {
     return Failure(error.get());
   } else if (state != READY) {
-    Join* join = new Join(data);
+    Join* join = new Join(data, label);
     pending.joins.push(join);
     return join->promise.future();
   }
@@ -145,14 +148,14 @@ Future<Group::Membership> GroupProcess::join(const string& data)
   // client can assume a happens-before ordering of operations (i.e.,
   // the first request will happen before the second, etc).
 
-  Result<Group::Membership> membership = doJoin(data);
+  Result<Group::Membership> membership = doJoin(data, label);
 
   if (membership.isNone()) { // Try again later.
     if (!retrying) {
       delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
       retrying = true;
     }
-    Join* join = new Join(data);
+    Join* join = new Join(data, label);
     pending.joins.push(join);
     return join->promise.future();
   } else if (membership.isError()) {
@@ -526,7 +529,9 @@ void GroupProcess::deleted(const string& path)
 }
 
 
-Result<Group::Membership> GroupProcess::doJoin(const string& data)
+Result<Group::Membership> GroupProcess::doJoin(
+    const string& data,
+    const Option<string>& label)
 {
   CHECK_EQ(state, READY);
 
@@ -534,8 +539,12 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
   // the specified data as it's contents.
   string result;
 
-  int code = zk->create(znode + "/", data, acl,
-                        ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+  int code = zk->create(
+      znode + "/" + (label.isSome() ? (label.get() + "_") : ""),
+      data,
+      acl,
+      ZOO_SEQUENCE | ZOO_EPHEMERAL,
+      &result);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
     CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
@@ -551,19 +560,24 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
   memberships = None();
 
   // Save the sequence number but only grab the basename. Example:
-  // "/path/to/znode/0000000131" => "0000000131".
+  // "/path/to/znode/label_0000000131" => "0000000131".
   Try<string> basename = os::basename(result);
   if (basename.isError()) {
     return Error("Failed to get the sequence number: " + basename.error());
   }
 
-  Try<int32_t> sequence = numify<int32_t>(basename.get());
+  // Strip the label before grabbing the sequence number.
+  string node = label.isSome()
+      ? strings::remove(basename.get(), label.get() + "_")
+      : basename.get();
+
+  Try<int32_t> sequence = numify<int32_t>(node);
   CHECK_SOME(sequence);
 
   Promise<bool>* cancelled = new Promise<bool>();
   owned[sequence.get()] = cancelled;
 
-  return Group::Membership(sequence.get(), cancelled->future());
+  return Group::Membership(sequence.get(), label, cancelled->future());
 }
 
 
@@ -571,11 +585,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
 {
   CHECK_EQ(state, READY);
 
-  Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
-  CHECK_SOME(sequence);
-
-  string path = znode + "/" + sequence.get();
+  string path = path::join(znode, zkBasename(membership));
 
   LOG(INFO) << "Trying to remove '" << path << "' in ZooKeeper";
 
@@ -614,11 +624,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
 {
   CHECK_EQ(state, READY);
 
-  Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
-  CHECK_SOME(sequence);
-
-  string path = znode + "/" + sequence.get();
+  string path = path::join(znode, zkBasename(membership));
 
   LOG(INFO) << "Trying to get '" << path << "' in ZooKeeper";
 
@@ -654,15 +660,21 @@ Try<bool> GroupProcess::cache()
     CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return false;
   } else if (code != ZOK) {
-    return Error("Non-retryable error attempting to get children of '" + znode +
-                 "' in ZooKeeper: " + zk->message(code));
+    return Error("Non-retryable error attempting to get children of '" + znode
+                 + "' in ZooKeeper: " + zk->message(code));
   }
 
-  // Convert results to sequence numbers.
-  set<int32_t> sequences;
+  // Convert results to sequence numbers and (optionally) labels.
+  hashmap<int32_t, Option<string> > sequences;
 
   foreach (const string& result, results) {
-    Try<int32_t> sequence = numify<int32_t>(result);
+    vector<string> tokens = strings::tokenize(result, "_");
+    Option<string> label = None();
+    if (tokens.size() > 1) {
+      label = tokens[0];
+    }
+
+    Try<int32_t> sequence = numify<int32_t>(tokens.back());
 
     // Skip it if it couldn't be converted to a number.
     if (sequence.isError()) {
@@ -671,39 +683,43 @@ Try<bool> GroupProcess::cache()
       continue;
     }
 
-    sequences.insert(sequence.get());
+    sequences[sequence.get()] = label;
   }
 
   // Cache current memberships, cancelling those that are now missing.
   set<Group::Membership> current;
 
   foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
-    if (sequences.count(sequence) == 0) {
+    if (!sequences.contains(sequence)) {
       cancelled->set(false);
       owned.erase(sequence); // Okay since iterating over a copy.
       delete cancelled;
     } else {
-      current.insert(Group::Membership(sequence, cancelled->future()));
+      current.insert(Group::Membership(
+          sequence, sequences[sequence], cancelled->future()));
+
       sequences.erase(sequence);
     }
   }
 
   foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(unowned)) {
-    if (sequences.count(sequence) == 0) {
+    if (!sequences.contains(sequence)) {
       cancelled->set(false);
       unowned.erase(sequence); // Okay since iterating over a copy.
       delete cancelled;
     } else {
-      current.insert(Group::Membership(sequence, cancelled->future()));
+      current.insert(Group::Membership(
+          sequence, sequences[sequence], cancelled->future()));
+
       sequences.erase(sequence);
     }
   }
 
   // Add any remaining (i.e., unexpected) sequences.
-  foreach (int32_t sequence, sequences) {
+  foreachpair (int32_t sequence, const Option<string>& label, sequences) {
     Promise<bool>* cancelled = new Promise<bool>();
     unowned[sequence] = cancelled;
-    current.insert(Group::Membership(sequence, cancelled->future()));
+    current.insert(Group::Membership(sequence, label, cancelled->future()));
   }
 
   memberships = current;
@@ -759,7 +775,7 @@ Try<bool> GroupProcess::sync()
   // Do joins.
   while (!pending.joins.empty()) {
     Join* join = pending.joins.front();
-    Result<Group::Membership> membership = doJoin(join->data);
+    Result<Group::Membership> membership = doJoin(join->data, join->label);
     if (membership.isNone()) {
       return false; // Try again later.
     } else if (membership.isError()) {
@@ -877,6 +893,17 @@ void GroupProcess::abort(const string& message)
 }
 
 
+string GroupProcess::zkBasename(const Group::Membership& membership)
+{
+  Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
+  CHECK_SOME(sequence);
+
+  return membership.label.isSome()
+      ? (membership.label.get() + "_" + sequence.get())
+      : sequence.get();
+}
+
+
 Group::Group(const string& servers,
              const Duration& timeout,
              const string& znode,
@@ -903,9 +930,11 @@ Group::~Group()
 }
 
 
-Future<Group::Membership> Group::join(const string& data)
+Future<Group::Membership> Group::join(
+    const string& data,
+    const Option<string>& label)
 {
-  return dispatch(process, &GroupProcess::join, data);
+  return dispatch(process, &GroupProcess::join, data, label);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 354229f..1ce1519 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -3,13 +3,16 @@
 
 #include <map>
 #include <set>
+#include <string>
 
 #include "process/future.hpp"
 #include "process/timer.hpp"
 
+#include <stout/check.hpp>
 #include <stout/duration.hpp>
 #include <stout/none.hpp>
 #include <stout/option.hpp>
+#include <stout/try.hpp>
 
 #include "zookeeper/authentication.hpp"
 #include "zookeeper/url.hpp"
@@ -85,10 +88,13 @@ public:
   private:
     friend class GroupProcess; // Creates and manages memberships.
 
-    Membership(int32_t _sequence, const process::Future<bool>& cancelled)
-      : sequence(_sequence), cancelled_(cancelled) {}
+    Membership(int32_t _sequence,
+               const Option<std::string>& _label,
+               const process::Future<bool>& cancelled)
+      : sequence(_sequence), label(_label), cancelled_(cancelled) {}
 
     const int32_t sequence;
+    const Option<std::string> label;
     process::Future<bool> cancelled_;
   };
 
@@ -103,13 +109,16 @@ public:
 
   ~Group();
 
-  // Returns the result of trying to join a "group" in ZooKeeper. If
-  // succesful, an "owned" membership will be returned whose
-  // retrievable data will be a copy of the specified parameter. A
-  // membership is not "renewed" in the event of a ZooKeeper session
-  // expiration. Instead, a client should watch the group memberships
-  // and rejoin the group as appropriate.
-  process::Future<Membership> join(const std::string& data);
+  // Returns the result of trying to join a "group" in ZooKeeper.
+  // If "label" is provided the newly created znode contains "label_"
+  // as the prefix. If join is successful, an "owned" membership will
+  // be returned whose retrievable data will be a copy of the
+  // specified parameter. A membership is not "renewed" in the event
+  // of a ZooKeeper session expiration. Instead, a client should watch
+  // the group memberships and rejoin the group as appropriate.
+  process::Future<Membership> join(
+      const std::string& data,
+      const Option<std::string>& label = None());
 
   // Returns the result of trying to cancel a membership. Note that
   // only memberships that are "owned" (see join) can be canceled.
@@ -150,8 +159,14 @@ public:
 
   static const Duration RETRY_INTERVAL;
 
+  // Helper function that returns the basename of the znode of
+  // the membership.
+  static std::string zkBasename(const Group::Membership& membership);
+
   // Group implementation.
-  process::Future<Group::Membership> join(const std::string& data);
+  process::Future<Group::Membership> join(
+      const std::string& data,
+      const Option<std::string>& label);
   process::Future<bool> cancel(const Group::Membership& membership);
   process::Future<std::string> data(const Group::Membership& membership);
   process::Future<std::set<Group::Membership> > watch(
@@ -167,7 +182,9 @@ public:
   void deleted(const std::string& path);
 
 private:
-  Result<Group::Membership> doJoin(const std::string& data);
+  Result<Group::Membership> doJoin(
+      const std::string& data,
+      const Option<std::string>& label);
   Result<bool> doCancel(const Group::Membership& membership);
   Result<std::string> doData(const Group::Membership& membership);
 
@@ -234,8 +251,10 @@ private:
 
   struct Join
   {
-    Join(const std::string& _data) : data(_data) {}
+    Join(const std::string& _data, const Option<std::string>& _label)
+      : data(_data), label(_label) {}
     std::string data;
+    const Option<std::string> label;
     process::Promise<Group::Membership> promise;
   };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.hpp b/src/zookeeper/zookeeper.hpp
index 1b4e2ed..f50aca6 100644
--- a/src/zookeeper/zookeeper.hpp
+++ b/src/zookeeper/zookeeper.hpp
@@ -187,12 +187,13 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int create(const std::string &path,
-	     const std::string &data,
-	     const ACL_vector &acl,
-	     int flags,
-	     std::string *result,
-	     bool recursive = false);
+  int create(
+      const std::string &path,
+      const std::string &data,
+      const ACL_vector &acl,
+      int flags,
+      std::string *result,
+      bool recursive = false);
 
   /**
    * \brief delete a node in zookeeper synchronously.
@@ -253,10 +254,11 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int get(const std::string &path,
-	  bool watch,
-	  std::string *result,
-	  Stat *stat);
+  int get(
+      const std::string &path,
+      bool watch,
+      std::string *result,
+      Stat *stat);
 
   /**
    * \brief lists the children of a node synchronously.
@@ -274,9 +276,10 @@ public:
    * ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
    * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
    */
-  int getChildren(const std::string &path,
-		  bool watch,
-		  std::vector<std::string> *results);
+  int getChildren(
+      const std::string &path,
+      bool watch,
+      std::vector<std::string> *results);
 
   /**
    * \brief sets the data associated with a node.