You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/12/04 01:25:22 UTC

[3/4] git commit: Fixed Group to retry if authentication fails due to retryable errors.

Fixed Group to retry if authentication fails due to retryable errors.

See summary.

From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/15706


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fb9edfd0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fb9edfd0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fb9edfd0

Branch: refs/heads/master
Commit: fb9edfd04ca4dfae5c916c78dedbce86215b0a74
Parents: 52c286d
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Dec 3 14:21:22 2013 -0800
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Tue Dec 3 14:21:22 2013 -0800

----------------------------------------------------------------------
 src/tests/group_tests.cpp |  60 ++++++++
 src/zookeeper/group.cpp   | 304 ++++++++++++++++++++++++-----------------
 src/zookeeper/group.hpp   |  38 ++++--
 3 files changed, 262 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/tests/group_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/group_tests.cpp b/src/tests/group_tests.cpp
index fb4c2f1..957256e 100644
--- a/src/tests/group_tests.cpp
+++ b/src/tests/group_tests.cpp
@@ -23,10 +23,12 @@
 #include <string>
 
 #include <process/future.hpp>
+#include <process/gmock.hpp>
 #include <process/gtest.hpp>
 
 #include <stout/gtest.hpp>
 #include <stout/option.hpp>
+#include <stout/os.hpp>
 
 #include "tests/zookeeper.hpp"
 
@@ -37,9 +39,11 @@ using namespace mesos::internal;
 using namespace mesos::internal::tests;
 
 using zookeeper::Group;
+using zookeeper::GroupProcess;
 
 using process::Future;
 
+using testing::_;
 
 class GroupTest : public ZooKeeperTest {};
 
@@ -310,3 +314,59 @@ TEST_F(GroupTest, GroupPathWithRestrictivePerms)
 
   AWAIT_READY(successGroup.join("succeed"));
 }
+
+
+// Verifies that the Group operations can recover from retryable
+// errors caused by session expiration. This test does not guarantee
+// but rather, attempts to probabilistically trigger the retries with
+// repeated session expirations.
+TEST_F(GroupTest, RetryableErrors)
+{
+  Future<Nothing> connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+
+  zookeeper::Authentication auth("digest", "creator:creator");
+  Group group(server->connectString(), NO_TIMEOUT, "/test/", auth);
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  Future<Option<int64_t> > session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+
+  // We repeatedly expire the session while Group operations are
+  // on-going. This causes retries of authenticate() and group
+  // create().
+  connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+  server->expireSession(session.get().get());
+
+  Future<Group::Membership> membership = group.join("hello world");
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+  connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+  server->expireSession(session.get().get());
+
+  AWAIT_READY(membership);
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+  connected = FUTURE_DISPATCH(_, &GroupProcess::connected);
+  server->expireSession(session.get().get());
+
+  Future<bool> cancellation = group.cancel(membership.get());
+
+  // Wait for Group to connect to get hold of the session.
+  AWAIT_READY(connected);
+  session = group.session();
+  AWAIT_READY(session);
+  ASSERT_SOME(session.get());
+  server->expireSession(session.get().get());
+
+  AWAIT_READY(cancellation);
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/zookeeper/group.cpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.cpp b/src/zookeeper/group.cpp
index 5218286..2ddc65e 100644
--- a/src/zookeeper/group.cpp
+++ b/src/zookeeper/group.cpp
@@ -68,7 +68,7 @@ GroupProcess::GroupProcess(
         : ZOO_OPEN_ACL_UNSAFE),
     watcher(NULL),
     zk(NULL),
-    state(DISCONNECTED),
+    state(CONNECTING),
     retrying(false)
 {}
 
@@ -87,7 +87,7 @@ GroupProcess::GroupProcess(
         : ZOO_OPEN_ACL_UNSAFE),
     watcher(NULL),
     zk(NULL),
-    state(DISCONNECTED),
+    state(CONNECTING),
     retrying(false)
 {}
 
@@ -116,9 +116,7 @@ void GroupProcess::initialize()
 
 Future<Group::Membership> GroupProcess::join(const string& data)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (state != CONNECTED) {
+  if (state != READY) {
     Join* join = new Join(data);
     pending.joins.push(join);
     return join->promise.future();
@@ -152,9 +150,7 @@ Future<Group::Membership> GroupProcess::join(const string& data)
 
 Future<bool> GroupProcess::cancel(const Group::Membership& membership)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (owned.count(membership.id()) == 0) {
+  if (owned.count(membership.id()) == 0) {
     // TODO(benh): Should this be an error? Right now a user can't
     // differentiate when 'false' means they can't cancel because it's
     // not owned or because it's already been cancelled (explicitly by
@@ -163,7 +159,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& membership)
     return false;
   }
 
-  if (state != CONNECTED) {
+  if (state != READY) {
     Cancel* cancel = new Cancel(membership);
     pending.cancels.push(cancel);
     return cancel->promise.future();
@@ -193,9 +189,7 @@ Future<bool> GroupProcess::cancel(const Group::Membership& membership)
 
 Future<string> GroupProcess::data(const Group::Membership& membership)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (state != CONNECTED) {
+  if (state != READY) {
     Data* data = new Data(membership);
     pending.datas.push(data);
     return data->promise.future();
@@ -222,9 +216,7 @@ Future<string> GroupProcess::data(const Group::Membership& membership)
 Future<set<Group::Membership> > GroupProcess::watch(
     const set<Group::Membership>& expected)
 {
-  if (error.isSome()) {
-    return Failure(error.get());
-  } else if (state != CONNECTED) {
+  if (state != READY) {
     Watch* watch = new Watch(expected);
     pending.watches.push(watch);
     return watch->promise.future();
@@ -241,17 +233,29 @@ Future<set<Group::Membership> > GroupProcess::watch(
   // membership "roll call" for each watch in order to make sure all
   // causal relationships are satisfied.
 
-  memberships.isSome() || cache();
-
-  if (memberships.isNone()) { // Try again later.
-    if (!retrying) {
-      delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
-      retrying = true;
+  if (memberships.isNone()) {
+    Try<bool> cached = cache();
+
+    if (cached.isError()) {
+      // Non-retryable error.
+      return Failure(cached.error());
+    } else if (!cached.get()) {
+      CHECK(memberships.isNone());
+
+      // Try again later.
+      if (!retrying) {
+        delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
+        retrying = true;
+      }
+      Watch* watch = new Watch(expected);
+      pending.watches.push(watch);
+      return watch->promise.future();
     }
-    Watch* watch = new Watch(expected);
-    pending.watches.push(watch);
-    return watch->promise.future();
-  } else if (memberships.get() == expected) { // Just wait for updates.
+  }
+
+  CHECK_SOME(memberships);
+
+  if (memberships.get() == expected) { // Just wait for updates.
     Watch* watch = new Watch(expected);
     pending.watches.push(watch);
     return watch->promise.future();
@@ -263,11 +267,7 @@ Future<set<Group::Membership> > GroupProcess::watch(
 
 Future<Option<int64_t> > GroupProcess::session()
 {
-  if (error.isSome()) {
-    Promise<Option<int64_t> > promise;
-    promise.fail(error.get());
-    return promise.future();
-  } else if (state != CONNECTED) {
+  if (state == CONNECTING) {
     return None();
   }
 
@@ -277,64 +277,87 @@ Future<Option<int64_t> > GroupProcess::session()
 
 void GroupProcess::connected(bool reconnect)
 {
-  if (!reconnect) {
-    // Authenticate if necessary (and we are connected for the first
-    // time, or after a session expiration).
-    if (auth.isSome()) {
-      LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+  LOG(INFO) << "Group process (" << self() << ") "
+            << (reconnect ? "reconnected" : "connected") << " to ZooKeeper";
 
-      int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
+  state = CONNECTED;
 
-      if (code != ZOK) { // TODO(benh): Authentication retries?
-        error = "Failed to authenticate with ZooKeeper: " + zk->message(code);
-        abort(); // Cancels everything pending.
-        return;
-      }
-    }
+  // Cancel and cleanup the reconnect timer (if necessary).
+  if (timer.isSome()) {
+    Timer::cancel(timer.get());
+    timer = None();
+  }
 
-    // Create znode path (including intermediate znodes) as necessary.
-    CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
-
-    LOG(INFO) << "Trying to create path '" << znode << "' in ZooKeeper";
-
-    int code = zk->create(znode, "", acl, 0, NULL, true);
-
-    // We fail all non-OK return codes except ZNONODEEXISTS (since
-    // that means the path we were trying to create exists) and
-    // ZNOAUTH (since it's possible that the ACLs on 'dirname(znode)'
-    // don't allow us to create a child znode but we are allowed to
-    // create children of 'znode' itself, which will be determined
-    // when we first do a Group::join). Note that it's also possible
-    // we got back a ZNONODE because we could not create one of the
-    // intermediate znodes (in which case we'll abort in the 'else'
-    // below since ZNONODE is non-retryable). TODO(benh): Need to
-    // check that we also can put a watch on the children of 'znode'.
-    if (code == ZINVALIDSTATE ||
-        (code != ZOK &&
-         code != ZNODEEXISTS &&
-         code != ZNOAUTH &&
-         zk->retryable(code))) {
-      CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
-      return; // Try again later.
-    } else if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
-      error =
-        "Failed to create '" + znode + "' in ZooKeeper: " + zk->message(code);
-      abort(); // Cancels everything pending.
-      return;
+  // Sync group operations (and set up the group on ZK).
+  Try<bool> synced = sync();
+
+  if (synced.isError()) {
+    // Non-retryable error. Abort.
+    abort(synced.error());
+  } else if (!synced.get()) {
+    // Retryable error.
+    if (!retrying) {
+      delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
+      retrying = true;
     }
-  } else {
-    LOG(INFO) << "Group process (" << self() << ")  reconnected to Zookeeper";
+  }
+}
+
+
+Try<bool> GroupProcess::authenticate()
+{
+  CHECK_EQ(state, CONNECTED);
+
+  // Authenticate if necessary.
+  if (auth.isSome()) {
+    LOG(INFO) << "Authenticating with ZooKeeper using " << auth.get().scheme;
+
+    int code = zk->authenticate(auth.get().scheme, auth.get().credentials);
 
-    // Cancel and cleanup the reconnect timer (if necessary).
-    if (timer.isSome()) {
-      Timer::cancel(timer.get());
-      timer = None();
+    if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+      return false;
+    } else if (code != ZOK) {
+      return Error(
+          "Failed to authenticate with ZooKeeper: " + zk->message(code));
     }
   }
 
-  state = CONNECTED;
+  state = AUTHENTICATED;
+  return true;
+}
+
+
+Try<bool> GroupProcess::create()
+{
+  CHECK_EQ(state, AUTHENTICATED);
+
+  // Create znode path (including intermediate znodes) as necessary.
+  CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
+
+  LOG(INFO) << "Trying to create path '" << znode << "' in ZooKeeper";
+
+  int code = zk->create(znode, "", acl, 0, NULL, true);
 
-  sync(); // Handle pending (and cache memberships).
+  // We fail all non-retryable return codes except ZNONODEEXISTS (
+  // since that means the path we were trying to create exists) and
+  // ZNOAUTH (since it's possible that the ACLs on 'dirname(znode)'
+  // don't allow us to create a child znode but we are allowed to
+  // create children of 'znode' itself, which will be determined
+  // when we first do a Group::join). Note that it's also possible
+  // we got back a ZNONODE because we could not create one of the
+  // intermediate znodes (in which case we'll abort in the 'else'
+  // below since ZNONODE is non-retryable). TODO(benh): Need to
+  // check that we also can put a watch on the children of 'znode'.
+  if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
+    return false;
+  } else if (code != ZOK && code != ZNODEEXISTS && code != ZNOAUTH) {
+    return Error(
+        "Failed to create '" + znode + "' in ZooKeeper: " + zk->message(code));
+  }
+
+  state = READY;
+  return true;
 }
 
 
@@ -369,8 +392,7 @@ void GroupProcess::timedout(const int64_t& sessionId)
     std::ostringstream error_;
     error_ << "Timed out waiting to reconnect to ZooKeeper (sessionId="
            << std::hex << sessionId << ")";
-    error = error_.str();
-    abort();
+    abort(error_.str());
   }
 }
 
@@ -400,24 +422,27 @@ void GroupProcess::expired()
   // then. We could imagine doing this for owned memberships too, but
   // for now we proactively cancel them above.
 
-  state = DISCONNECTED;
+  state = CONNECTING;
 
   delete CHECK_NOTNULL(zk);
   delete CHECK_NOTNULL(watcher);
   watcher = new ProcessWatcher<GroupProcess>(self());
   zk = new ZooKeeper(servers, timeout, watcher);
-
-  state = CONNECTING;
 }
 
 
 void GroupProcess::updated(const string& path)
 {
-  CHECK(znode == path);
+  CHECK_EQ(znode, path);
+
+  Try<bool> cached = cache(); // Update cache (will invalidate first).
 
-  cache(); // Update cache (will invalidate first).
+  if (cached.isError()) {
+    abort(cached.error()); // Cancel everything pending.
+  } else if (!cached.get()) {
+    CHECK(memberships.isNone());
 
-  if (memberships.isNone()) { // Something changed so we must try again later.
+    // Try again later.
     if (!retrying) {
       delay(RETRY_INTERVAL, self(), &GroupProcess::retry, RETRY_INTERVAL);
       retrying = true;
@@ -442,8 +467,7 @@ void GroupProcess::deleted(const string& path)
 
 Result<Group::Membership> GroupProcess::doJoin(const string& data)
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  CHECK_EQ(state, READY);
 
   // Create a new ephemeral node to represent a new member and use the
   // the specified data as it's contents.
@@ -453,7 +477,7 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
                         ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return None();
   } else if (code != ZOK) {
     return Error(
@@ -484,8 +508,7 @@ Result<Group::Membership> GroupProcess::doJoin(const string& data)
 
 Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  CHECK_EQ(state, READY);
 
   Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
 
@@ -499,7 +522,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
   int code = zk->remove(path, -1);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return None();
   } else if (code == ZNONODE) {
     // This can happen because the membership could have expired but
@@ -528,8 +551,7 @@ Result<bool> GroupProcess::doCancel(const Group::Membership& membership)
 
 Result<string> GroupProcess::doData(const Group::Membership& membership)
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  CHECK_EQ(state, READY);
 
   Try<string> sequence = strings::format("%.*d", 10, membership.sequence);
 
@@ -545,7 +567,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
   int code = zk->get(path, false, &result, NULL);
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return None();
   } else if (code != ZOK) {
     return Error(
@@ -557,7 +579,7 @@ Result<string> GroupProcess::doData(const Group::Membership& membership)
 }
 
 
-bool GroupProcess::cache()
+Try<bool> GroupProcess::cache()
 {
   // Invalidate first (if it's not already).
   memberships = None();
@@ -568,14 +590,11 @@ bool GroupProcess::cache()
   int code = zk->getChildren(znode, true, &results); // Sets the watch!
 
   if (code == ZINVALIDSTATE || (code != ZOK && zk->retryable(code))) {
-    CHECK(zk->getState() != ZOO_AUTH_FAILED_STATE);
+    CHECK_NE(zk->getState(), ZOO_AUTH_FAILED_STATE);
     return false;
   } else if (code != ZOK) {
-    error =
-      "Non-retryable error attempting to get children of '" + znode + "'"
-      " in ZooKeeper: " + zk->message(code);
-    abort(); // Cancels everything pending.
-    return false;
+    return Error("Non-retryable error attempting to get children of '" + znode +
+                 "' in ZooKeeper: " + zk->message(code));
   }
 
   // Convert results to sequence numbers.
@@ -651,10 +670,30 @@ void GroupProcess::update()
 }
 
 
-bool GroupProcess::sync()
+Try<bool> GroupProcess::sync()
 {
-  CHECK(error.isNone()) << ": " << error.get();
-  CHECK(state == CONNECTED);
+  LOG(INFO)
+    << "Syncing group operations: queue size (joins, cancels, datas) = ("
+    << pending.joins.size() << ", " << pending.cancels.size() << ", "
+    << pending.datas.size() << ")";
+
+  CHECK_NE(state, CONNECTING);
+
+  // Authenticate with ZK if not already created.
+  if (state == CONNECTED) {
+    Try<bool> authenticated = authenticate();
+    if (authenticated.isError() || !authenticated.get()) {
+      return authenticated;
+    }
+  }
+
+  // Create group base path if not already created.
+  if (state == AUTHENTICATED) {
+    Try<bool> created = create();
+    if (created.isError() || !created.get()) {
+      return created;
+    }
+  }
 
   // Do joins.
   while (!pending.joins.empty()) {
@@ -709,8 +748,10 @@ bool GroupProcess::sync()
   // cancels first through any explicit futures for them rather than
   // watches.
   if (memberships.isNone()) {
-    if (!cache()) {
-      return false; // Try again later (if no error).
+    Try<bool> cached = cache();
+    if (cached.isError() || !cached.get()) {
+      CHECK(memberships.isNone());
+      return cached;
     } else {
       update(); // Update any pending watches.
     }
@@ -722,31 +763,38 @@ bool GroupProcess::sync()
 
 void GroupProcess::retry(const Duration& duration)
 {
-  if (error.isSome() || state != CONNECTED) {
-    retrying = false; // Stop retrying, we'll sync at reconnect (if no error).
-  } else if (error.isNone() && state == CONNECTED) {
-    bool synced = sync(); // Might get another retryable error.
-    if (!synced && error.isNone()) {
-      // Backoff.
-      Seconds seconds = std::min(duration * 2, Duration(Seconds(60)));
-      delay(seconds, self(), &GroupProcess::retry, seconds);
-    } else {
-      retrying = false;
-    }
+  CHECK(retrying);
+
+  // Will reset it to true if another retry is necessary.
+  retrying = false;
+
+  if (state == CONNECTING) {
+    // The delayed retry can be invoked while group is trying to
+    // (re)connect to ZK. In this case we directly return and we will
+    // sync when group is connected to ZK.
+    return;
   }
-}
 
+  Try<bool> synced = sync();
 
-void GroupProcess::abort()
-{
-  CHECK_SOME(error);
+  if (synced.isError()) {
+    // Non-retryable error. Abort.
+    abort(synced.error());
+  } else if (!synced.get()) {
+    // Backoff and keep retrying.
+    retrying = true;
+    Seconds seconds = std::min(duration * 2, Duration(Seconds(60)));
+    delay(seconds, self(), &GroupProcess::retry, seconds);
+  }
+}
 
-  fail(&pending.joins, error.get());
-  fail(&pending.cancels, error.get());
-  fail(&pending.datas, error.get());
-  fail(&pending.watches, error.get());
 
-  error = None();
+void GroupProcess::abort(const string& error)
+{
+  fail(&pending.joins, error);
+  fail(&pending.cancels, error);
+  fail(&pending.datas, error);
+  fail(&pending.watches, error);
 
   // If we decide to abort, make sure we expire the session
   // (cleaning up any ephemeral ZNodes as necessary). We also

http://git-wip-us.apache.org/repos/asf/mesos/blob/fb9edfd0/src/zookeeper/group.hpp
----------------------------------------------------------------------
diff --git a/src/zookeeper/group.hpp b/src/zookeeper/group.hpp
index 04068e3..27b2ee9 100644
--- a/src/zookeeper/group.hpp
+++ b/src/zookeeper/group.hpp
@@ -171,15 +171,28 @@ private:
   Result<bool> doCancel(const Group::Membership& membership);
   Result<std::string> doData(const Group::Membership& membership);
 
-  // Attempts to cache the current set of memberships.
-  bool cache();
+  // Returns true if authentication is successful, false if the
+  // failure is retryable and Error otherwise.
+  Try<bool> authenticate();
 
-  // Updates any pending watches.
-  void update();
+  // Creates the group (which means creating its base path) on ZK.
+  // Returns true if successful, false if the failure is retryable
+  // and Error otherwise.
+  Try<bool> create();
+
+  // Attempts to cache the current set of memberships.
+  // Returns true if successful, false if the failure is retryable
+  // and Error otherwise.
+  Try<bool> cache();
 
   // Synchronizes pending operations with ZooKeeper and also attempts
   // to cache the current set of memberships if necessary.
-  bool sync();
+  // Returns true if successful, false if the failure is retryable
+  // and Error otherwise.
+  Try<bool> sync();
+
+  // Updates any pending watches.
+  void update();
 
   // Generic retry method. This mechanism is "generic" in the sense
   // that it is not specific to any particular operation, but rather
@@ -188,12 +201,10 @@ private:
   void retry(const Duration& duration);
 
   // Fails all pending operations.
-  void abort();
+  void abort(const std::string& error);
 
   void timedout(const int64_t& sessionId);
 
-  Option<std::string> error; // Potential non-retryable error.
-
   const std::string servers;
   const Duration timeout;
   const std::string znode;
@@ -205,10 +216,12 @@ private:
   Watcher* watcher;
   ZooKeeper* zk;
 
-  enum State { // ZooKeeper connection state.
-    DISCONNECTED,
-    CONNECTING,
-    CONNECTED,
+  enum State {     // Group connection state.
+    CONNECTING,    // ZooKeeper connecting.
+    CONNECTED,     // ZooKeeper connected but before group setup.
+    AUTHENTICATED, // ZooKeeper connected and authenticated.
+    READY,         // ZooKeeper connected, session authenticated and
+                   // base path for the group created.
   } state;
 
   struct Join
@@ -249,6 +262,7 @@ private:
     std::queue<Watch*> watches;
   } pending;
 
+  // Indicates there is a pending delayed retry.
   bool retrying;
 
   // Expected ZooKeeper sequence numbers (either owned/created by this