You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/01/23 00:59:45 UTC
[2/4] git commit: Improved Group to take label as an option.
Improved Group to take label as an option.
Review: https://reviews.apache.org/r/17171
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/326172ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/326172ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/326172ec
Branch: refs/heads/master
Commit: 326172ec4571de0952278f005d6cba64d4dfc120
Parents: 42946ed
Author: Vinod Kone <vi...@twitter.com>
Authored: Tue Jan 21 16:18:32 2014 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Jan 22 15:51:12 2014 -0800
----------------------------------------------------------------------
src/tests/group_tests.cpp | 34 ++++++++++++++
src/zookeeper/group.cpp | 99 ++++++++++++++++++++++++++--------------
src/zookeeper/group.hpp | 43 ++++++++++++-----
src/zookeeper/zookeeper.hpp | 29 ++++++------
4 files changed, 145 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index ac1942b..5d4240c 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -372,3 +372,37 @@ TEST_F(GroupTest, RetryableErrors)
AWAIT_READY(cancellation);
AWAIT_READY(connected);
}
+
+
+TEST_F(GroupTest, LabelledGroup)
+{
+ Group group(server->connectString(), NO_TIMEOUT, "/test/");
+
+ // Join a group with label.
+ Future<Group::Membership> membership = group.join(
+ "hello world", std::string("testlabel"));
+
+ AWAIT_READY(membership);
+
+ Future<std::set<Group::Membership> > memberships = group.watch();
+
+ AWAIT_READY(memberships);
+ EXPECT_EQ(1u, memberships.get().size());
+ EXPECT_EQ(1u, memberships.get().count(membership.get()));
+
+ Future<std::string> data = group.data(membership.get());
+
+ AWAIT_EXPECT_EQ("hello world", data);
+
+ Future<bool> cancellation = group.cancel(membership.get());
+
+ AWAIT_EXPECT_EQ(true, cancellation);
+
+ memberships = group.watch(memberships.get());
+
+ AWAIT_READY(memberships);
+ EXPECT_EQ(0u, memberships.get().size());
+
+ ASSERT_TRUE(membership.get().cancelled().isReady());
+ ASSERT_TRUE(membership.get().cancelled().get());
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 72ebe69..a50da22 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -13,6 +13,7 @@
#include <stout/none.hpp>
#include <stout/numify.hpp>
#include <stout/os.hpp>
+#include <stout/path.hpp>
#include <stout/result.hpp>
#include <stout/some.hpp>
#include <stout/strings.hpp>
@@ -127,12 +128,14 @@ void GroupProcess::initialize()
}
-Future<Group::Membership> GroupProcess::join(const string& data)
+Future<Group::Membership> GroupProcess::join(
+ const string& data,
+ const Option<string>& label)
{
if (error.isSome()) {
return Failure(error.get());
} else if (state != READY) {
- Join* join = new Join(data);
+ Join* join = new Join(data, label);
pending.joins.push(join);
return join->promise.future();
}
@@ -145,14 +148,14 @@ Future<Group::Membership> GroupProcess::join(const string& data)
// client can assume a happens-before ordering of operations (i.e.,
// the first request will happen before the second, etc).
- Result<Group::Membership> membership = doJoin(data);
+ Result<Group::Membership> membership = doJoin(data, label);
if (membership.isNone()) { // Try again later.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
}
- Join* join = new Join(data);
+ Join* join = new Join(data, label);
pending.joins.push(join);
return join->promise.future();
} else if (membership.isError()) {
@@ -526,7 +529,9 @@ void GroupProcess::deleted(const string& path)
}
-Result<Group::Membership> GroupProcess::doJoin(const string& data)
+Result<Group::Membership> GroupProcess::doJoin(
+ const string& data,
+ const Option<string>& label)
{
CHECK_EQ(state, READY);
@@ -534,8 +539,12 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
// the specified data as it's contents.
string result;
- int code = zk->create(znode + "/", data, acl,
- ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+ int code = zk->create(
+ znode + "/" + (label.isSome() ? (label.get() + "_") : ""),
+ data,
+ acl,
+ ZOO_SEQUENCE | ZOO_EPHEMERAL,
+ &result);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
@@ -551,19 +560,24 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
memberships = None();
// Save the sequence number but only grab the basename. Example:
- // "/path/to/znode/0000000131" => "0000000131".
+ // "/path/to/znode/label_0000000131" => "0000000131".
Try<string> basename = os::basename(result);
if (basename.isError()) {
return Error("Failed to get the sequence number: " + basename.error());
}
- Try<int32_t> sequence = numify<int32_t>(basename.get());
+ // Strip the label before grabbing the sequence number.
+ string node = label.isSome()
+ ? strings::remove(basename.get(), label.get() + "_")
+ : basename.get();
+
+ Try<int32_t> sequence = numify<int32_t>(node);
CHECK_SOME(sequence);
Promise<bool>* cancelled = new Promise<bool>();
owned[sequence.get()] = cancelled;
- return Group::Membership(sequence.get(), cancelled->future());
+ return Group::Membership(sequence.get(), label, cancelled->future());
}
@@ -571,11 +585,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
{
CHECK_EQ(state, READY);
- Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
- CHECK_SOME(sequence);
-
- string path = znode + "/" + sequence.get();
+ string path = path::join(znode, zkBasename(membership));
LOG(INFO) << "Trying to remove '" << path << "' in ZooKeeper";
@@ -614,11 +624,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
{
CHECK_EQ(state, READY);
- Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
-
- CHECK_SOME(sequence);
-
- string path = znode + "/" + sequence.get();
+ string path = path::join(znode, zkBasename(membership));
LOG(INFO) << "Trying to get '" << path << "' in ZooKeeper";
@@ -654,15 +660,21 @@ Try<bool> GroupProcess::cache()
CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return false;
} else if (code != ZOK) {
- return Error("Non-retryable error attempting to get children of '" + znode +
- "' in ZooKeeper: " + zk->message(code));
+ return Error("Non-retryable error attempting to get children of '" + znode
+ + "' in ZooKeeper: " + zk->message(code));
}
- // Convert results to sequence numbers.
- set<int32_t> sequences;
+ // Convert results to sequence numbers and (optionally) labels.
+ hashmap<int32_t, Option<string> > sequences;
foreach (const string& result, results) {
- Try<int32_t> sequence = numify<int32_t>(result);
+ vector<string> tokens = strings::tokenize(result, "_");
+ Option<string> label = None();
+ if (tokens.size() > 1) {
+ label = tokens[0];
+ }
+
+ Try<int32_t> sequence = numify<int32_t>(tokens.back());
// Skip it if it couldn't be converted to a number.
if (sequence.isError()) {
@@ -671,39 +683,43 @@ Try<bool> GroupProcess::cache()
continue;
}
- sequences.insert(sequence.get());
+ sequences[sequence.get()] = label;
}
// Cache current memberships, cancelling those that are now missing.
set<Group::Membership> current;
foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(owned)) {
- if (sequences.count(sequence) == 0) {
+ if (!sequences.contains(sequence)) {
cancelled->set(false);
owned.erase(sequence); // Okay since iterating over a copy.
delete cancelled;
} else {
- current.insert(Group::Membership(sequence, cancelled->future()));
+ current.insert(Group::Membership(
+ sequence, sequences[sequence], cancelled->future()));
+
sequences.erase(sequence);
}
}
foreachpair (int32_t sequence, Promise<bool>* cancelled, utils::copy(unowned)) {
- if (sequences.count(sequence) == 0) {
+ if (!sequences.contains(sequence)) {
cancelled->set(false);
unowned.erase(sequence); // Okay since iterating over a copy.
delete cancelled;
} else {
- current.insert(Group::Membership(sequence, cancelled->future()));
+ current.insert(Group::Membership(
+ sequence, sequences[sequence], cancelled->future()));
+
sequences.erase(sequence);
}
}
// Add any remaining (i.e., unexpected) sequences.
- foreach (int32_t sequence, sequences) {
+ foreachpair (int32_t sequence, const Option<string>& label, sequences) {
Promise<bool>* cancelled = new Promise<bool>();
unowned[sequence] = cancelled;
- current.insert(Group::Membership(sequence, cancelled->future()));
+ current.insert(Group::Membership(sequence, label, cancelled->future()));
}
memberships = current;
@@ -759,7 +775,7 @@ Try<bool> GroupProcess::sync()
// Do joins.
while (!pending.joins.empty()) {
Join* join = pending.joins.front();
- Result<Group::Membership> membership = doJoin(join->data);
+ Result<Group::Membership> membership = doJoin(join->data, join->label);
if (membership.isNone()) {
return false; // Try again later.
} else if (membership.isError()) {
@@ -877,6 +893,17 @@ void GroupProcess::abort(const string& message)
}
+string GroupProcess::zkBasename(const Group::Membership& membership)
+{
+ Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
+ CHECK_SOME(sequence);
+
+ return membership.label.isSome()
+ ? (membership.label.get() + "_" + sequence.get())
+ : sequence.get();
+}
+
+
Group::Group(const string& servers,
const Duration& timeout,
const string& znode,
@@ -903,9 +930,11 @@ Group::~Group()
}
-Future<Group::Membership> Group::join(const string& data)
+Future<Group::Membership> Group::join(
+ const string& data,
+ const Option<string>& label)
{
- return dispatch(process, &GroupProcess::join, data);
+ return dispatch(process, &GroupProcess::join, data, label);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 354229f..1ce1519 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -3,13 +3,16 @@
#include <map>
#include <set>
+#include <string>
#include "process/future.hpp"
#include "process/timer.hpp"
+#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/none.hpp>
#include <stout/option.hpp>
+#include <stout/try.hpp>
#include "zookeeper/authentication.hpp"
#include "zookeeper/url.hpp"
@@ -85,10 +88,13 @@ public:
private:
friend class GroupProcess; // Creates and manages memberships.
- Membership(int32_t _sequence, const process::Future<bool>& cancelled)
- : sequence(_sequence), cancelled_(cancelled) {}
+ Membership(int32_t _sequence,
+ const Option<std::string>& _label,
+ const process::Future<bool>& cancelled)
+ : sequence(_sequence), label(_label), cancelled_(cancelled) {}
const int32_t sequence;
+ const Option<std::string> label;
process::Future<bool> cancelled_;
};
@@ -103,13 +109,16 @@ public:
~Group();
- // Returns the result of trying to join a "group" in ZooKeeper. If
- // succesful, an "owned" membership will be returned whose
- // retrievable data will be a copy of the specified parameter. A
- // membership is not "renewed" in the event of a ZooKeeper session
- // expiration. Instead, a client should watch the group memberships
- // and rejoin the group as appropriate.
- process::Future<Membership> join(const std::string& data);
+ // Returns the result of trying to join a "group" in ZooKeeper.
+ // If "label" is provided the newly created znode contains "label_"
+ // as the prefix. If join is successful, an "owned" membership will
+ // be returned whose retrievable data will be a copy of the
+ // specified parameter. A membership is not "renewed" in the event
+ // of a ZooKeeper session expiration. Instead, a client should watch
+ // the group memberships and rejoin the group as appropriate.
+ process::Future<Membership> join(
+ const std::string& data,
+ const Option<std::string>& label = None());
// Returns the result of trying to cancel a membership. Note that
// only memberships that are "owned" (see join) can be canceled.
@@ -150,8 +159,14 @@ public:
static const Duration RETRY_INTERVAL;
+ // Helper function that returns the basename of the znode of
+ // the membership.
+ static std::string zkBasename(const Group::Membership& membership);
+
// Group implementation.
- process::Future<Group::Membership> join(const std::string& data);
+ process::Future<Group::Membership> join(
+ const std::string& data,
+ const Option<std::string>& label);
process::Future<bool> cancel(const Group::Membership& membership);
process::Future<std::string> data(const Group::Membership& membership);
process::Future<std::set<Group::Membership> > watch(
@@ -167,7 +182,9 @@ public:
void deleted(const std::string& path);
private:
- Result<Group::Membership> doJoin(const std::string& data);
+ Result<Group::Membership> doJoin(
+ const std::string& data,
+ const Option<std::string>& label);
Result<bool> doCancel(const Group::Membership& membership);
Result<std::string> doData(const Group::Membership& membership);
@@ -234,8 +251,10 @@ private:
struct Join
{
- Join(const std::string& _data) : data(_data) {}
+ Join(const std::string& _data, const Option<std::string>& _label)
+ : data(_data), label(_label) {}
std::string data;
+ const Option<std::string> label;
process::Promise<Group::Membership> promise;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/326172ec/src/zookeeper/zookeeper.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/zookeeper.hpp b/src/zookeeper/zookeeper.hpp
index 1b4e2ed..f50aca6 100644
--- a/src/zookeeper/zookeeper.hpp
+++ b/src/zookeeper/zookeeper.hpp
@@ -187,12 +187,13 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int create(const std::string &path,
- const std::string &data,
- const ACL_vector &acl,
- int flags,
- std::string *result,
- bool recursive = false);
+ int create(
+ const std::string &path,
+ const std::string &data,
+ const ACL_vector &acl,
+ int flags,
+ std::string *result,
+ bool recursive = false);
/**
* \brief delete a node in zookeeper synchronously.
@@ -253,10 +254,11 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int get(const std::string &path,
- bool watch,
- std::string *result,
- Stat *stat);
+ int get(
+ const std::string &path,
+ bool watch,
+ std::string *result,
+ Stat *stat);
/**
* \brief lists the children of a node synchronously.
@@ -274,9 +276,10 @@ public:
* ZINVALIDSTATE - state is ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
* ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
*/
- int getChildren(const std::string &path,
- bool watch,
- std::vector<std::string> *results);
+ int getChildren(
+ const std::string &path,
+ bool watch,
+ std::vector<std::string> *results);
/**
* \brief sets the data associated with a node.