You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2014/04/23 02:27:43 UTC

git commit: Allowed replicated log to do auto initialization.

Repository: mesos
Updated Branches:
  refs/heads/master d6f51c5a2 -> 27221fd51


Allowed replicated log to do auto initialization.

Review: https://reviews.apache.org/r/18600


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/27221fd5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/27221fd5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/27221fd5

Branch: refs/heads/master
Commit: 27221fd512bfd4b2ad85a1e4fcb6c7954df87d9b
Parents: d6f51c5
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Apr 4 11:26:07 2014 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Apr 22 17:26:53 2014 -0700

----------------------------------------------------------------------
 src/log/log.cpp         |  47 ++++++++--
 src/log/log.hpp         |   6 +-
 src/log/recover.cpp     | 209 +++++++++++++++++++++++++++++++++++++------
 src/log/recover.hpp     |   9 +-
 src/tests/log_tests.cpp | 138 ++++++++++++++++++++++++++++
 5 files changed, 371 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/log.cpp
----------------------------------------------------------------------
diff --git a/src/log/log.cpp b/src/log/log.cpp
index 9dd992f..ca97144 100644
--- a/src/log/log.cpp
+++ b/src/log/log.cpp
@@ -55,7 +55,8 @@ public:
   LogProcess(
       size_t _quorum,
       const string& path,
-      const set<UPID>& pids);
+      const set<UPID>& pids,
+      bool _autoInitialize);
 
   LogProcess(
       size_t _quorum,
@@ -63,7 +64,8 @@ public:
       const string& servers,
       const Duration& timeout,
       const string& znode,
-      const Option<zookeeper::Authentication>& auth);
+      const Option<zookeeper::Authentication>& auth,
+      bool _autoInitialize);
 
   // Recovers the log by catching up if needed. Returns a shared
   // pointer to the local replica if the recovery succeeds.
@@ -91,6 +93,7 @@ private:
   const size_t quorum;
   Shared<Replica> replica;
   Shared<Network> network;
+  const bool autoInitialize;
 
   // For replica recovery.
   Option<Future<Owned<Replica> > > recovering;
@@ -197,11 +200,13 @@ private:
 LogProcess::LogProcess(
     size_t _quorum,
     const string& path,
-    const set<UPID>& pids)
+    const set<UPID>& pids,
+    bool _autoInitialize)
   : ProcessBase(ID::generate("log")),
     quorum(_quorum),
     replica(new Replica(path)),
     network(new Network(pids + (UPID) replica->pid())),
+    autoInitialize(_autoInitialize),
     group(NULL) {}
 
 
@@ -211,11 +216,13 @@ LogProcess::LogProcess(
     const string& servers,
     const Duration& timeout,
     const string& znode,
-    const Option<zookeeper::Authentication>& auth)
+    const Option<zookeeper::Authentication>& auth,
+    bool _autoInitialize)
   : ProcessBase(ID::generate("log")),
     quorum(_quorum),
     replica(new Replica(path)),
     network(new ZooKeeperNetwork(servers, timeout, znode, auth)),
+    autoInitialize(_autoInitialize),
     group(new zookeeper::Group(servers, timeout, znode, auth)) {}
 
 
@@ -305,7 +312,12 @@ Future<Shared<Replica> > LogProcess::recover()
     // 'release' in Shared which will provide this CHECK internally.
     CHECK(replica.unique());
 
-    recovering = log::recover(quorum, replica.own().get(), network)
+    recovering =
+      log::recover(
+          quorum,
+          replica.own().get(),
+          network,
+          autoInitialize)
       .onAny(defer(self(), &Self::_recover));
   }
 
@@ -720,11 +732,18 @@ void LogWriterProcess::failed(const string& message, const string& reason)
 Log::Log(
     int quorum,
     const string& path,
-    const set<UPID>& pids)
+    const set<UPID>& pids,
+    bool autoInitialize)
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-  process = new LogProcess(quorum, path, pids);
+  process =
+    new LogProcess(
+        quorum,
+        path,
+        pids,
+        autoInitialize);
+
   spawn(process);
 }
 
@@ -734,11 +753,21 @@ Log::Log(
     const string& servers,
     const Duration& timeout,
     const string& znode,
-    const Option<zookeeper::Authentication>& auth)
+    const Option<zookeeper::Authentication>& auth,
+    bool autoInitialize)
 {
   GOOGLE_PROTOBUF_VERIFY_VERSION;
 
-  process = new LogProcess(quorum, path, servers, timeout, znode, auth);
+  process =
+    new LogProcess(
+        quorum,
+        path,
+        servers,
+        timeout,
+        znode,
+        auth,
+        autoInitialize);
+
   spawn(process);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/log.hpp
----------------------------------------------------------------------
diff --git a/src/log/log.hpp b/src/log/log.hpp
index 6787c80..7c905c7 100644
--- a/src/log/log.hpp
+++ b/src/log/log.hpp
@@ -189,7 +189,8 @@ public:
   // with other replicas via the set of process PIDs.
   Log(int quorum,
       const std::string& path,
-      const std::set<process::UPID>& pids);
+      const std::set<process::UPID>& pids,
+      bool autoInitialize = false);
 
   // Creates a new replicated log that assumes the specified quorum
   // size, is backed by a file at the specified path, and coordinates
@@ -200,7 +201,8 @@ public:
       const std::string& servers,
       const Duration& timeout,
       const std::string& znode,
-      const Option<zookeeper::Authentication>& auth = None());
+      const Option<zookeeper::Authentication>& auth = None(),
+      bool autoInitialize = false);
 
   ~Log();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
index 688da5f..7c0566a 100644
--- a/src/log/recover.cpp
+++ b/src/log/recover.cpp
@@ -27,6 +27,7 @@
 #include <process/process.hpp>
 
 #include <stout/check.hpp>
+#include <stout/duration.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/lambda.hpp>
@@ -48,7 +49,6 @@ namespace mesos {
 namespace internal {
 namespace log {
 
-
 // This class is responsible for executing the log recover protocol.
 // Any time a replica in non-VOTING status starts, we will run this
 // protocol. We first broadcast a recover request to all the replicas
@@ -58,9 +58,18 @@ namespace log {
 //
 // A) Broadcast a RecoverRequest to all replicas in the network.
 // B) Collect RecoverResponse from each replica
-//   B1) If a quorum of replicas are found in VOTING status, the local
-//       replica will be in RECOVERING status next.
-//   B2) Otherwise, goto (A).
+//   B1) If a quorum of replicas are found in VOTING status (no matter
+//       what status the local replica is in currently), the local
+//       replica will be put in RECOVERING status next.
+//   B2) If the local replica is in EMPTY status and all replicas are
+//       found in either EMPTY status or STARTING status, the local
+//       replica will be put in STARTING status next.
+//   B3) If the local replica is in STARTING status and all replicas
+//       are found in either STARTING status or VOTING status, the
+//       local replica will be put in VOTING status next.
+//   B4) Otherwise, goto (A).
+//
+//   (B2 and B3 are used to do the two-phase auto initialization.)
 //
 // We re-use RecoverResponse to specify the return value. The 'status'
 // field specifies the next status of the local replica. If the next
@@ -71,10 +80,17 @@ class RecoverProtocolProcess : public Process<RecoverProtocolProcess>
 public:
   RecoverProtocolProcess(
       size_t _quorum,
-      const Shared<Network>& _network)
+      const Shared<Network>& _network,
+      const Metadata::Status& _status,
+      bool _autoInitialize,
+      const Duration& _timeout)
     : ProcessBase(ID::generate("log-recover-protocol")),
       quorum(_quorum),
-      network(_network) {}
+      network(_network),
+      status(_status),
+      autoInitialize(_autoInitialize),
+      timeout(_timeout),
+      terminating(false) {}
 
   Future<RecoverResponse> future() { return promise.future(); }
 
@@ -88,8 +104,25 @@ protected:
   }
 
 private:
+  static Future<Option<RecoverResponse> > timedout(
+      Future<Option<RecoverResponse> > future,
+      const Duration& timeout)
+  {
+    LOG(INFO) << "Unable to finish the recover protocol in "
+              << timeout << ", retrying";
+
+    future.discard();
+
+    // The 'future' will eventually become DISCARDED, at which time we
+    // will re-run the recover protocol. We use the boolean flag
+    // 'terminating' to distinguish between a user initiated discard
+    // and a timeout induced discard.
+    return future;
+  }
+
   void discard()
   {
+    terminating = true;
     chain.discard();
   }
 
@@ -100,6 +133,7 @@ private:
     chain = network->watch(quorum, Network::GREATER_THAN_OR_EQUAL_TO)
       .then(defer(self(), &Self::broadcast))
       .then(defer(self(), &Self::receive))
+      .after(timeout, lambda::bind(&Self::timedout, lambda::_1, timeout))
       .onAny(defer(self(), &Self::finished, lambda::_1));
   }
 
@@ -187,15 +221,88 @@ private:
       return result;
     }
 
+    if (autoInitialize) {
+      // The following code handles the auto-initialization. Our idea
+      // is: we allow a replica in EMPTY status to become VOTING
+      // immediately if it finds ALL (i.e., 2 * quorum - 1) replicas
+      // are in EMPTY status. This is based on the assumption that the
+      // only time ALL replicas are in EMPTY status is during start-up
+      // This may not be true if we have a catastrophic failure in
+      // which all replicas are gone, and that's exactly the reason we
+      // allow users to disable auto-initialization.
+      //
+      // To do auto-initialization, if we use a single phase protocol
+      // and allow a replica to directly transit from EMPTY status to
+      // VOTING status, we may run into a state where we cannot make
+      // progress even if all replicas are in EMPTY status initially.
+      // For example, say the quorum size is 2. All replicas are in
+      // EMPTY status initially. One replica broadcasts a recover
+      // request and becomes VOTING before other replicas start
+      // broadcasting recover requests. In that case, no replica can
+      // make progress. To solve this problem, we use a two-phase
+      // protocol and introduce an intermediate transient status
+      // (STARTING) between EMPTY and VOTING status. A replica in
+      // EMPTY status can transit to STARTING status if it find all
+      // replicas are in either EMPTY or STARTING status. A replica in
+      // STARTING status can transit to VOTING status if it finds all
+      // replicas are in either STARTING or VOTING status. In that
+      // way, in our previous example, all replicas will be in
+      // STARTING status before any of them can transit to VOTING
+      // status.
+
+      // TODO(jieyu): Currently, we simply calculate the size of the
+      // cluster from the quorum size. In the future, we may wanna
+      // allow users to specify the cluster size in case they want to
+      // use a non-standard quorum size (e.g., cluster size = 5,
+      // quorum size = 4).
+      size_t clusterSize = (2 * quorum) - 1;
+
+      switch (status) {
+        case Metadata::EMPTY:
+          if ((responsesReceived[Metadata::EMPTY] +
+               responsesReceived[Metadata::STARTING]) >= clusterSize) {
+            process::discard(responses);
+
+            RecoverResponse result;
+            result.set_status(Metadata::STARTING);
+
+            return result;
+          }
+          break;
+        case Metadata::STARTING:
+          if ((responsesReceived[Metadata::STARTING] +
+               responsesReceived[Metadata::VOTING]) >= clusterSize) {
+            process::discard(responses);
+
+            RecoverResponse result;
+            result.set_status(Metadata::VOTING);
+
+            return result;
+          }
+          break;
+        default:
+          // Ignore all other cases.
+          break;
+      }
+    }
+
     // Handle the next response.
     return receive();
   }
 
   void finished(const Future<Option<RecoverResponse> >& future)
   {
-    if (future.isDiscarded()) {
-      promise.discard();
-      terminate(self());
+   if (future.isDiscarded()) {
+      // We use the boolean flag 'terminating' to distinguish between
+      // a user initiated discard and a timeout induced discard. In
+      // the case of a user initiated discard, the flag 'terminating'
+      // will be set to true in 'Self::discard()'.
+      if (terminating) {
+        promise.discard();
+        terminate(self());
+      } else {
+        start(); // Re-run the recover protocol after timeout.
+      }
     } else if (future.isFailed()) {
       promise.fail(future.failure());
       terminate(self());
@@ -216,24 +323,37 @@ private:
 
   const size_t quorum;
   const Shared<Network> network;
+  const Metadata::Status status;
+  const bool autoInitialize;
+  const Duration timeout;
 
   set<Future<RecoverResponse> > responses;
   hashmap<Metadata::Status, size_t> responsesReceived;
   Option<uint64_t> lowestBeginPosition;
   Option<uint64_t> highestEndPosition;
   Future<Option<RecoverResponse> > chain;
+  bool terminating;
 
   process::Promise<RecoverResponse> promise;
 };
 
 
-// The wrapper for running the recover protocol.
+// The wrapper for running the recover protocol. We will re-run the
+// recover protocol if it cannot be finished within 'timeout'.
 static Future<RecoverResponse> runRecoverProtocol(
     size_t quorum,
-    const Shared<Network>& network)
+    const Shared<Network>& network,
+    const Metadata::Status& status,
+    bool autoInitialize,
+    const Duration& timeout = Seconds(10))
 {
   RecoverProtocolProcess* process =
-    new RecoverProtocolProcess(quorum, network);
+    new RecoverProtocolProcess(
+        quorum,
+        network,
+        status,
+        autoInitialize,
+        timeout);
 
   Future<RecoverResponse> future = process->future();
   spawn(process, true);
@@ -249,7 +369,9 @@ static Future<RecoverResponse> runRecoverProtocol(
 // the next status is determined to be RECOVERING, we will start doing
 // catch-up. Later, if the local replica has caught-up, we will set
 // the status of the local replica to VOTING and terminate the
-// process, indicating the recovery has completed.
+// process, indicating the recovery has completed. If all replicas are
+// in EMPTY status and auto-initialization is enabled, a two-phase
+// protocol will be used to bootstrap the replicated log.
 //
 // Here, we list a few scenarios and show how the recover process will
 // respond in those scenarios. All the examples assume a quorum size
@@ -273,17 +395,27 @@ static Future<RecoverResponse> runRecoverProtocol(
 // 4) Replica A is in VOTING status and B is in EMPTY status. The
 //    operator adds replica C. In that case, C will stay in EMPTY
 //    status forever similar to case 3).
+//
+// 5) Replica A, B and C are all in EMPTY status. Depending on whether
+//    auto-initialization is enabled or not, the replicas will behave
+//    differently. If auto-initialization is enabled, all replicas
+//    will first go into STARTING status. Once *all* replicas have
+//    transitioned out of EMPTY status, the replicas will go into
+//    VOTING status. If auto-initialization is disabled, all replicas
+//    will remain in EMPTY status.
 class RecoverProcess : public Process<RecoverProcess>
 {
 public:
   RecoverProcess(
       size_t _quorum,
       const Owned<Replica>& _replica,
-      const Shared<Network>& _network)
+      const Shared<Network>& _network,
+      bool _autoInitialize)
     : ProcessBase(ID::generate("log-recover")),
       quorum(_quorum),
       replica(_replica),
-      network(_network) {}
+      network(_network),
+      autoInitialize(_autoInitialize) {}
 
   Future<Owned<Replica> > future() { return promise.future(); }
 
@@ -322,20 +454,39 @@ private:
       // No need to do recovery.
       return Nothing();
     } else {
-      return runRecoverProtocol(quorum, network)
+      return runRecoverProtocol(quorum, network, status, autoInitialize)
         .then(defer(self(), &Self::_recover, lambda::_1));
     }
   }
 
   Future<Nothing> _recover(const RecoverResponse& result)
   {
-    if (result.status() == Metadata::RECOVERING) {
-      CHECK(result.has_begin() && result.has_end());
+    switch (result.status()) {
+      case Metadata::STARTING:
+        // This is the auto-initialization case. As mentioned above, we
+        // use a two-phase protocol to bootstrap. When the control
+        // reaches here, the first phase just ended. We start the second
+        // phase by re-running the recover protocol.
+        CHECK(autoInitialize);
 
-      return updateReplicaStatus(Metadata::RECOVERING)
-        .then(defer(self(), &Self::catchup, result.begin(), result.end()));
-    } else {
-      return Failure("Unexpected status returned from the recover protocol");
+        return updateReplicaStatus(Metadata::STARTING)
+          .then(defer(self(), &Self::recover, Metadata::STARTING));
+
+      case Metadata::VOTING:
+        // This is the also the auto-initialization case. When the
+        // control reaches here, the second phase just ended.
+        CHECK(autoInitialize);
+
+        return updateReplicaStatus(Metadata::VOTING);
+
+      case Metadata::RECOVERING:
+        CHECK(result.has_begin() && result.has_end());
+
+        return updateReplicaStatus(Metadata::RECOVERING)
+          .then(defer(self(), &Self::catchup, result.begin(), result.end()));
+
+      default:
+        return Failure("Unexpected status returned from the recover protocol");
     }
   }
 
@@ -410,7 +561,7 @@ private:
 
   Future<Nothing> getReplicaOwnership(Shared<Replica> shared)
   {
-    // Try to regain the ownership of the replica.
+    // Try to re-gain the ownership of the replica.
     return shared.own()
       .then(defer(self(), &Self::_getReplicaOwnership, lambda::_1));
   }
@@ -439,6 +590,7 @@ private:
   const size_t quorum;
   Owned<Replica> replica;
   const Shared<Network> network;
+  const bool autoInitialize;
 
   Future<Nothing> chain;
 
@@ -449,9 +601,16 @@ private:
 Future<Owned<Replica> > recover(
     size_t quorum,
     const Owned<Replica>& replica,
-    const Shared<Network>& network)
+    const Shared<Network>& network,
+    bool autoInitialize)
 {
-  RecoverProcess* process = new RecoverProcess(quorum, replica, network);
+  RecoverProcess* process =
+    new RecoverProcess(
+        quorum,
+        replica,
+        network,
+        autoInitialize);
+
   Future<Owned<Replica> > future = process->future();
   spawn(process, true);
   return future;

http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/log/recover.hpp
----------------------------------------------------------------------
diff --git a/src/log/recover.hpp b/src/log/recover.hpp
index 634bc06..6243c18 100644
--- a/src/log/recover.hpp
+++ b/src/log/recover.hpp
@@ -45,12 +45,17 @@ namespace log {
 // positions are recovered such that if other replicas fail, the
 // remaining replicas can restore all the successfully written log
 // entries; 2) its future votes cannot not contradict its lost votes.
+//
 // This function returns an owned pointer to the recovered replica if
-// the recovery is successful.
+// the recovery is successful. If the auto-initialization flag is set,
+// an empty replica will be allowed to vote if ALL replicas (i.e.,
+// quorum * 2 - 1) are empty. This allows us to bootstrap the
+// replicated log without explicitly using an initialization tool.
 extern process::Future<process::Owned<Replica> > recover(
     size_t quorum,
     const process::Owned<Replica>& replica,
-    const process::Shared<Network>& network);
+    const process::Shared<Network>& network,
+    bool autoInitialize = false);
 
 } // namespace log {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/27221fd5/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index 4f08927..db91ef8 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -1698,6 +1698,144 @@ TEST_F(RecoverTest, CatchupRetry)
 }
 
 
+TEST_F(RecoverTest, AutoInitialization)
+{
+  const string path1 = os::getcwd() + "/.log1";
+  const string path2 = os::getcwd() + "/.log2";
+  const string path3 = os::getcwd() + "/.log3";
+
+  Owned<Replica> replica1(new Replica(path1));
+  Owned<Replica> replica2(new Replica(path2));
+  Owned<Replica> replica3(new Replica(path3));
+
+  set<UPID> pids;
+  pids.insert(replica1->pid());
+  pids.insert(replica2->pid());
+  pids.insert(replica3->pid());
+
+  Shared<Network> network(new Network(pids));
+
+  Future<Owned<Replica> > recovering1 = recover(2, replica1, network, true);
+  Future<Owned<Replica> > recovering2 = recover(2, replica2, network, true);
+
+  // Verifies that replica1 and replica2 cannot transit into VOTING
+  // status because replica3 is still in EMPTY status. We flush the
+  // event queue before checking.
+  Clock::pause();
+  Clock::settle();
+  Clock::resume();
+
+  EXPECT_TRUE(recovering1.isPending());
+  EXPECT_TRUE(recovering2.isPending());
+
+  Future<Owned<Replica> > recovering3 = recover(2, replica3, network, true);
+
+  AWAIT_READY(recovering1);
+  AWAIT_READY(recovering2);
+  AWAIT_READY(recovering3);
+
+  Owned<Replica> shared_ = recovering1.get();
+  Shared<Replica> shared = shared_.share();
+
+  Coordinator coord(2, shared, network);
+
+  {
+    Future<Option<uint64_t> > electing = coord.elect();
+    AWAIT_READY(electing);
+    EXPECT_SOME_EQ(0u, electing.get());
+  }
+
+  {
+    Future<Option<uint64_t> > appending = coord.append("hello world");
+    AWAIT_READY(appending);
+    EXPECT_SOME_EQ(1u, appending.get());
+  }
+
+  {
+    Future<list<Action> > actions = shared->read(1, 1);
+    AWAIT_READY(actions);
+    ASSERT_EQ(1u, actions.get().size());
+    EXPECT_EQ(1u, actions.get().front().position());
+    ASSERT_TRUE(actions.get().front().has_type());
+    ASSERT_EQ(Action::APPEND, actions.get().front().type());
+    EXPECT_EQ("hello world", actions.get().front().append().bytes());
+  }
+}
+
+
+TEST_F(RecoverTest, AutoInitializationRetry)
+{
+  const string path1 = os::getcwd() + "/.log1";
+  const string path2 = os::getcwd() + "/.log2";
+  const string path3 = os::getcwd() + "/.log3";
+
+  Owned<Replica> replica1(new Replica(path1));
+  Owned<Replica> replica2(new Replica(path2));
+  Owned<Replica> replica3(new Replica(path3));
+
+  set<UPID> pids;
+  pids.insert(replica1->pid());
+  pids.insert(replica2->pid());
+  pids.insert(replica3->pid());
+
+  Shared<Network> network(new Network(pids));
+
+  // Simulate the case where replica3 is temporarily removed.
+  DROP_MESSAGE(Eq(RecoverRequest().GetTypeName()), _, Eq(replica3->pid()));
+  DROP_MESSAGE(Eq(RecoverRequest().GetTypeName()), _, Eq(replica3->pid()));
+
+  Clock::pause();
+
+  Future<Owned<Replica> > recovering1 = recover(2, replica1, network, true);
+  Future<Owned<Replica> > recovering2 = recover(2, replica2, network, true);
+
+  // Flush the event queue.
+  Clock::settle();
+
+  EXPECT_TRUE(recovering1.isPending());
+  EXPECT_TRUE(recovering2.isPending());
+
+  Future<Owned<Replica> > recovering3 = recover(2, replica3, network, true);
+
+  // Replica1 and replica2 will retry recovery after 10 seconds.
+  Clock::advance(Seconds(10));
+  Clock::settle();
+
+  Clock::resume();
+
+  AWAIT_READY(recovering1);
+  AWAIT_READY(recovering2);
+  AWAIT_READY(recovering3);
+
+  Owned<Replica> shared_ = recovering1.get();
+  Shared<Replica> shared = shared_.share();
+
+  Coordinator coord(2, shared, network);
+
+  {
+    Future<Option<uint64_t> > electing = coord.elect();
+    AWAIT_READY(electing);
+    EXPECT_SOME_EQ(0u, electing.get());
+  }
+
+  {
+    Future<Option<uint64_t> > appending = coord.append("hello world");
+    AWAIT_READY(appending);
+    EXPECT_SOME_EQ(1u, appending.get());
+  }
+
+  {
+    Future<list<Action> > actions = shared->read(1, 1);
+    AWAIT_READY(actions);
+    ASSERT_EQ(1u, actions.get().size());
+    EXPECT_EQ(1u, actions.get().front().position());
+    ASSERT_TRUE(actions.get().front().has_type());
+    ASSERT_EQ(Action::APPEND, actions.get().front().type());
+    EXPECT_EQ("hello world", actions.get().front().append().bytes());
+  }
+}
+
+
 class LogTest : public TemporaryDirectoryTest
 {
 protected: