You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/12/04 01:25:22 UTC
[3/4] git commit: Fixed Group to retry if authentication fails due to
retryable errors.
Fixed Group to retry if authentication fails due to retryable errors.
See summary.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15706
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fb9edfd0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fb9edfd0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fb9edfd0
Branch: refs/heads/master
Commit: fb9edfd04ca4dfae5c916c78dedbce86215b0a74
Parents: 52c286d
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Dec 3 14:21:22 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Dec 3 14:21:22 2013 -0800
----------------------------------------------------------------------
src/tests/group_tests.cpp | 60 ++++++++
src/zookeeper/group.cpp | 304 ++++++++++++++++++++++++-----------------
src/zookeeper/group.hpp | 38 ++++--
3 files changed, 262 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index fb4c2f1..957256e 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -23,10 +23,12 @@
#include <string>
#include <process/future.hpp>
+#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <stout/gtest.hpp>
#include <stout/option.hpp>
+#include <stout/os.hpp>
#include "tests/zookeeper.hpp"
@@ -37,9 +39,11 @@ using namespace mesos::internal;
using namespace mesos::internal::tests;
using zookeeper::Group;
+using zookeeper::GroupProcess;
using process::Future;
+using testing::_;
class GroupTest : public ZooKeeperTest {};
@@ -310,3 +314,59 @@ TEST_F(GroupTest, GroupPathWithRestrictivePerms)
AWAIT_READY(successGroup.join("succeed"));
}
+
+
+// Verifies that the Group operations can recover from retryable
+// errors caused by session expiration. This test does not guarantee
+// but rather, attempts to probabilistically trigger the retries with
+// repeated session expirations.
+TEST_F(GroupTest, RetryableErrors)
+{
+ Future<Nothing> connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+
+ zookeeper::Authentication auth("digest", "creator:creator");
+ Group group(server->connectString(), NO_TIMEOUT, "/test/", auth);
+
+ // Wait for Group to connect to get hold of the session.
+ AWAIT_READY(connected);
+ Future<Option<int64_t> > session = group.session();
+ AWAIT_READY(session);
+ ASSERT_SOME(session.get());
+
+ // We repeatedly expire the session while Group operations are
+ // on-going. This causes retries of authenticate() and group
+ // create().
+ connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+ server->expireSession(session.get().get());
+
+ Future<Group::Membership> membership = group.join("hello world");
+
+ // Wait for Group to connect to get hold of the session.
+ AWAIT_READY(connected);
+ session = group.session();
+ AWAIT_READY(session);
+ ASSERT_SOME(session.get());
+ connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+ server->expireSession(session.get().get());
+
+ AWAIT_READY(membership);
+
+ // Wait for Group to connect to get hold of the session.
+ AWAIT_READY(connected);
+ session = group.session();
+ AWAIT_READY(session);
+ ASSERT_SOME(session.get());
+ connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+ server->expireSession(session.get().get());
+
+ Future<bool> cancellation = group.cancel(membership.get());
+
+ // Wait for Group to connect to get hold of the session.
+ AWAIT_READY(connected);
+ session = group.session();
+ AWAIT_READY(session);
+ ASSERT_SOME(session.get());
+ server->expireSession(session.get().get());
+
+ AWAIT_READY(cancellation);
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 5218286..2ddc65e 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -68,7 +68,7 @@ GroupProcess::GroupProcess(
: ZOO_OPEN_ACL_UNSAFE),
watcher(NULL),
zk(NULL),
- state(DISCONNECTED),
+ state(CONNECTING),
retrying(false)
{}
@@ -87,7 +87,7 @@ GroupProcess::GroupProcess(
: ZOO_OPEN_ACL_UNSAFE),
watcher(NULL),
zk(NULL),
- state(DISCONNECTED),
+ state(CONNECTING),
retrying(false)
{}
@@ -116,9 +116,7 @@ void GroupProcess::initialize()
Future<Group::Membership> GroupProcess::join(const string& data)
{
- if (error.isSome()) {
- return Failure(error.get());
- } else if (state != CONNECTED) {
+ if (state != READY) {
Join* join = new Join(data);
pending.joins.push(join);
return join->promise.future();
@@ -152,9 +150,7 @@ Future<Group::Membership> GroupProcess::join(const string& data)
Future<bool> GroupProcess::cancel(const Group::Membership& membership)
{
- if (error.isSome()) {
- return Failure(error.get());
- } else if (owned.count(membership.id()) == 0) {
+ if (owned.count(membership.id()) == 0) {
// TODO(benh): Should this be an error? Right now a user can't
// differentiate when 'false' means they can't cancel because it's
// not owned or because it's already been cancelled (explicitly by
@@ -163,7 +159,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& membership)
return false;
}
- if (state != CONNECTED) {
+ if (state != READY) {
Cancel* cancel = new Cancel(membership);
pending.cancels.push(cancel);
return cancel->promise.future();
@@ -193,9 +189,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& membership)
Future<string> GroupProcess::data(const Group::Membership& membership)
{
- if (error.isSome()) {
- return Failure(error.get());
- } else if (state != CONNECTED) {
+ if (state != READY) {
Data* data = new Data(membership);
pending.datas.push(data);
return data->promise.future();
@@ -222,9 +216,7 @@ Future<string> GroupProcess::data(const Group::Membership& membership)
Future<set<Group::Membership> > GroupProcess::watch(
const set<Group::Membership>& expected)
{
- if (error.isSome()) {
- return Failure(error.get());
- } else if (state != CONNECTED) {
+ if (state != READY) {
Watch* watch = new Watch(expected);
pending.watches.push(watch);
return watch->promise.future();
@@ -241,17 +233,29 @@ Future<set<Group::Membership> > GroupProcess::watch(
// membership "roll call" for each watch in order to make sure all
// causal relationships are satisfied.
- memberships.isSome() || cache();
-
- if (memberships.isNone()) { // Try again later.
- if (!retrying) {
- delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
- retrying = true;
+ if (memberships.isNone()) {
+ Try<bool> cached = cache();
+
+ if (cached.isError()) {
+ // Non-retryable error.
+ return Failure(cached.error());
+ } else if (!cached.get()) {
+ CHECK(memberships.isNone());
+
+ // Try again later.
+ if (!retrying) {
+ delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
+ retrying = true;
+ }
+ Watch* watch = new Watch(expected);
+ pending.watches.push(watch);
+ return watch->promise.future();
}
- Watch* watch = new Watch(expected);
- pending.watches.push(watch);
- return watch->promise.future();
- } else if (memberships.get() == expected) { // Just wait for updates.
+ }
+
+ CHECK_SOME(memberships);
+
+ if (memberships.get() == expected) { // Just wait for updates.
Watch* watch = new Watch(expected);
pending.watches.push(watch);
return watch->promise.future();
@@ -263,11 +267,7 @@ Future<set<Group::Membership> > GroupProcess::watch(
Future<Option<int64_t> > GroupProcess::session()
{
- if (error.isSome()) {
- Promise<Option<int64_t> > promise;
- promise.fail(error.get());
- return promise.future();
- } else if (state != CONNECTED) {
+ if (state == CONNECTING) {
return None();
}
@@ -277,64 +277,87 @@ Future<Option<int64_t> > GroupProcess::session()
void GroupProcess::connected(bool reconnect)
{
- if (!reconnect) {
- // Authenticate if necessary (and we are connected for the first
- // time, or after a session expiration).
- if (auth.isSome()) {
- LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+ LOG(INFO) << "Group process (" << self() << ") "
+ << (reconnect ? "reconnected" : "connected") << " to ZooKeeper";
- int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
+ state = CONNECTED;
- if (code != ZOK) { // TODO(benh): Authentication retries?
- error = "Failed to authenticate with ZooKeeper: " + zk->message(code);
- abort(); // Cancels everything pending.
- return;
- }
- }
+ // Cancel and cleanup the reconnect timer (if necessary).
+ if (timer.isSome()) {
+ Timer::cancel(timer.get());
+ timer = None();
+ }
- // Create znode path (including intermediate znodes) as necessary.
- CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
-
- LOG(INFO) << "Trying to create path '" << znode << "' in ZooKeeper";
-
- int code = zk->create(znode, "", acl, 0, NULL, true);
-
- // We fail all non-OK return codes except ZNONODEEXISTS (since
- // that means the path we were trying to create exists) and
- // ZNOAUTH (since it's possible that the ACLs on 'dirname(znode)'
- // don't allow us to create a child znode but we are allowed to
- // create children of 'znode' itself, which will be determined
- // when we first do a Group::join). Note that it's also possible
- // we got back a ZNONODE because we could not create one of the
- // intermediate znodes (in which case we'll abort in the 'else'
- // below since ZNONODE is non-retryable). TODO(benh): Need to
- // check that we also can put a watch on the children of 'znode'.
- if (code == ZINVALIDSTATE ||
- (code != ZOK &&
- code != ZNODEEXISTS &&
- code != ZNOAUTH &&
- zk->retryable(code))) {
- CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
- return; // Try again later.
- } else if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
- error =
- "Failed to create '" + znode + "' in ZooKeeper: " + zk->message(code);
- abort(); // Cancels everything pending.
- return;
+ // Sync group operations (and set up the group on ZK).
+ Try<bool> synced = sync();
+
+ if (synced.isError()) {
+ // Non-retryable error. Abort.
+ abort(synced.error());
+ } else if (!synced.get()) {
+ // Retryable error.
+ if (!retrying) {
+ delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
+ retrying = true;
}
- } else {
- LOG(INFO) << "Group process (" << self() << ") reconnected to Zookeeper";
+ }
+}
+
+
+Try<bool> GroupProcess::authenticate()
+{
+ CHECK_EQ(state, CONNECTED);
+
+ // Authenticate if necessary.
+ if (auth.isSome()) {
+ LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+
+ int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
- // Cancel and cleanup the reconnect timer (if necessary).
- if (timer.isSome()) {
- Timer::cancel(timer.get());
- timer = None();
+ if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ return false;
+ } else if (code != ZOK) {
+ return Error(
+ "Failed to authenticate with ZooKeeper: " + zk->message(code));
}
}
- state = CONNECTED;
+ state = AUTHENTICATED;
+ return true;
+}
+
+
+Try<bool> GroupProcess::create()
+{
+ CHECK_EQ(state, AUTHENTICATED);
+
+ // Create znode path (including intermediate znodes) as necessary.
+ CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
+
+ LOG(INFO) << "Trying to create path '" << znode << "' in ZooKeeper";
+
+ int code = zk->create(znode, "", acl, 0, NULL, true);
- sync(); // Handle pending (and cache memberships).
+ // We fail all non-retryable return codes except ZNONODEEXISTS (
+ // since that means the path we were trying to create exists) and
+ // ZNOAUTH (since it's possible that the ACLs on 'dirname(znode)'
+ // don't allow us to create a child znode but we are allowed to
+ // create children of 'znode' itself, which will be determined
+ // when we first do a Group::join). Note that it's also possible
+ // we got back a ZNONODE because we could not create one of the
+ // intermediate znodes (in which case we'll abort in the 'else'
+ // below since ZNONODE is non-retryable). TODO(benh): Need to
+ // check that we also can put a watch on the children of 'znode'.
+ if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+ CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
+ return false;
+ } else if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
+ return Error(
+ "Failed to create '" + znode + "' in ZooKeeper: " + zk->message(code));
+ }
+
+ state = READY;
+ return true;
}
@@ -369,8 +392,7 @@ void GroupProcess::timedout(const int64_t& sessionId)
std::ostringstream error_;
error_ << "Timed out waiting to reconnect to ZooKeeper (sessionId="
<< std::hex << sessionId << ")";
- error = error_.str();
- abort();
+ abort(error_.str());
}
}
@@ -400,24 +422,27 @@ void GroupProcess::expired()
// then. We could imagine doing this for owned memberships too, but
// for now we proactively cancel them above.
- state = DISCONNECTED;
+ state = CONNECTING;
delete CHECK_NOTNULL(zk);
delete CHECK_NOTNULL(watcher);
watcher = new ProcessWatcher<GroupProcess>(self());
zk = new ZooKeeper(servers, timeout, watcher);
-
- state = CONNECTING;
}
void GroupProcess::updated(const string& path)
{
- CHECK(znode == path);
+ CHECK_EQ(znode, path);
+
+ Try<bool> cached = cache(); // Update cache (will invalidate first).
- cache(); // Update cache (will invalidate first).
+ if (cached.isError()) {
+ abort(cached.error()); // Cancel everything pending.
+ } else if (!cached.get()) {
+ CHECK(memberships.isNone());
- if (memberships.isNone()) { // Something changed so we must try again later.
+ // Try again later.
if (!retrying) {
delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
retrying = true;
@@ -442,8 +467,7 @@ void GroupProcess::deleted(const string& path)
Result<Group::Membership> GroupProcess::doJoin(const string& data)
{
- CHECK(error.isNone()) << ": " << error.get();
- CHECK(state == CONNECTED);
+ CHECK_EQ(state, READY);
// Create a new ephemeral node to represent a new member and use the
// the specified data as it's contents.
@@ -453,7 +477,7 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
- CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return None();
} else if (code != ZOK) {
return Error(
@@ -484,8 +508,7 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
{
- CHECK(error.isNone()) << ": " << error.get();
- CHECK(state == CONNECTED);
+ CHECK_EQ(state, READY);
Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
@@ -499,7 +522,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
int code = zk->remove(path, -1);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
- CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return None();
} else if (code == ZNONODE) {
// This can happen because the membership could have expired but
@@ -528,8 +551,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
Result<string> GroupProcess::doData(const Group::Membership& membership)
{
- CHECK(error.isNone()) << ": " << error.get();
- CHECK(state == CONNECTED);
+ CHECK_EQ(state, READY);
Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
@@ -545,7 +567,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
int code = zk->get(path, false, &result, NULL);
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
- CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return None();
} else if (code != ZOK) {
return Error(
@@ -557,7 +579,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
}
-bool GroupProcess::cache()
+Try<bool> GroupProcess::cache()
{
// Invalidate first (if it's not already).
memberships = None();
@@ -568,14 +590,11 @@ bool GroupProcess::cache()
int code = zk->getChildren(znode, true, &results); // Sets the watch!
if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
- CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+ CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
return false;
} else if (code != ZOK) {
- error =
- "Non-retryable error attempting to get children of '" + znode + "'"
- " in ZooKeeper: " + zk->message(code);
- abort(); // Cancels everything pending.
- return false;
+ return Error("Non-retryable error attempting to get children of '" + znode +
+ "' in ZooKeeper: " + zk->message(code));
}
// Convert results to sequence numbers.
@@ -651,10 +670,30 @@ void GroupProcess::update()
}
-bool GroupProcess::sync()
+Try<bool> GroupProcess::sync()
{
- CHECK(error.isNone()) << ": " << error.get();
- CHECK(state == CONNECTED);
+ LOG(INFO)
+ << "Syncing group operations: queue size (joins, cancels, datas) = ("
+ << pending.joins.size() << ", " << pending.cancels.size() << ", "
+ << pending.datas.size() << ")";
+
+ CHECK_NE(state, CONNECTING);
+
+ // Authenticate with ZK if not already created.
+ if (state == CONNECTED) {
+ Try<bool> authenticated = authenticate();
+ if (authenticated.isError() || !authenticated.get()) {
+ return authenticated;
+ }
+ }
+
+ // Create group base path if not already created.
+ if (state == AUTHENTICATED) {
+ Try<bool> created = create();
+ if (created.isError() || !created.get()) {
+ return created;
+ }
+ }
// Do joins.
while (!pending.joins.empty()) {
@@ -709,8 +748,10 @@ bool GroupProcess::sync()
// cancels first through any explicit futures for them rather than
// watches.
if (memberships.isNone()) {
- if (!cache()) {
- return false; // Try again later (if no error).
+ Try<bool> cached = cache();
+ if (cached.isError() || !cached.get()) {
+ CHECK(memberships.isNone());
+ return cached;
} else {
update(); // Update any pending watches.
}
@@ -722,31 +763,38 @@ bool GroupProcess::sync()
void GroupProcess::retry(const Duration& duration)
{
- if (error.isSome() || state != CONNECTED) {
- retrying = false; // Stop retrying, we'll sync at reconnect (if no error).
- } else if (error.isNone() && state == CONNECTED) {
- bool synced = sync(); // Might get another retryable error.
- if (!synced && error.isNone()) {
- // Backoff.
- Seconds seconds = std::min(duration * 2, Duration(Seconds(60)));
- delay(seconds, self(), &GroupProcess::retry, seconds);
- } else {
- retrying = false;
- }
+ CHECK(retrying);
+
+ // Will reset it to true if another retry is necessary.
+ retrying = false;
+
+ if (state == CONNECTING) {
+ // The delayed retry can be invoked while group is trying to
+ // (re)connect to ZK. In this case we directly return and we will
+ // sync when group is connected to ZK.
+ return;
}
-}
+ Try<bool> synced = sync();
-void GroupProcess::abort()
-{
- CHECK_SOME(error);
+ if (synced.isError()) {
+ // Non-retryable error. Abort.
+ abort(synced.error());
+ } else if (!synced.get()) {
+ // Backoff and keep retrying.
+ retrying = true;
+ Seconds seconds = std::min(duration * 2, Duration(Seconds(60)));
+ delay(seconds, self(), &GroupProcess::retry, seconds);
+ }
+}
- fail(&pending.joins, error.get());
- fail(&pending.cancels, error.get());
- fail(&pending.datas, error.get());
- fail(&pending.watches, error.get());
- error = None();
+void GroupProcess::abort(const string& error)
+{
+ fail(&pending.joins, error);
+ fail(&pending.cancels, error);
+ fail(&pending.datas, error);
+ fail(&pending.watches, error);
// If we decide to abort, make sure we expire the session
// (cleaning up any ephemeral ZNodes as necessary). We also
http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 04068e3..27b2ee9 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -171,15 +171,28 @@ private:
Result<bool> doCancel(const Group::Membership& membership);
Result<std::string> doData(const Group::Membership& membership);
- // Attempts to cache the current set of memberships.
- bool cache();
+ // Returns true if authentication is successful, false if the
+ // failure is retryable and Error otherwise.
+ Try<bool> authenticate();
- // Updates any pending watches.
- void update();
+ // Creates the group (which means creating its base path) on ZK.
+ // Returns true if successful, false if the failure is retryable
+ // and Error otherwise.
+ Try<bool> create();
+
+ // Attempts to cache the current set of memberships.
+ // Returns true if successful, false if the failure is retryable
+ // and Error otherwise.
+ Try<bool> cache();
// Synchronizes pending operations with ZooKeeper and also attempts
// to cache the current set of memberships if necessary.
- bool sync();
+ // Returns true if successful, false if the failure is retryable
+ // and Error otherwise.
+ Try<bool> sync();
+
+ // Updates any pending watches.
+ void update();
// Generic retry method. This mechanism is "generic" in the sense
// that it is not specific to any particular operation, but rather
@@ -188,12 +201,10 @@ private:
void retry(const Duration& duration);
// Fails all pending operations.
- void abort();
+ void abort(const std::string& error);
void timedout(const int64_t& sessionId);
- Option<std::string> error; // Potential non-retryable error.
-
const std::string servers;
const Duration timeout;
const std::string znode;
@@ -205,10 +216,12 @@ private:
Watcher* watcher;
ZooKeeper* zk;
- enum State { // ZooKeeper connection state.
- DISCONNECTED,
- CONNECTING,
- CONNECTED,
+ enum State { // Group connection state.
+ CONNECTING, // ZooKeeper connecting.
+ CONNECTED, // ZooKeeper connected but before group setup.
+ AUTHENTICATED, // ZooKeeper connected and authenticated.
+ READY, // ZooKeeper connected, session authenticated and
+ // base path for the group created.
} state;
struct Join
@@ -249,6 +262,7 @@ private:
std::queue<Watch*> watches;
} pending;
+ // Indicates there is a pending delayed retry.
bool retrying;
// Expected ZooKeeper sequence numbers (either owned/created by this