You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:25 UTC

[05/10] git commit: Added log recovery support.

Added log recovery support.

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/15802


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f9b60c4c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f9b60c4c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f9b60c4c

Branch: refs/heads/master
Commit: f9b60c4cac503b2d7a1c0cb5403203e619e563e6
Parents: 6ea7c14
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:53:40 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:53:40 2014 -0800

----------------------------------------------------------------------
 src/Makefile.am           |    3 +
 src/common/type_utils.hpp |   25 +
 src/log/coordinator.cpp   |  121 +---
 src/log/coordinator.hpp   |   42 +-
 src/log/log.cpp           | 1192 ++++++++++++++++++++++++----------------
 src/log/log.hpp           |  283 ++--------
 src/log/recover.cpp       |  403 ++++++++++++++
 src/log/recover.hpp       |   59 ++
 src/log/replica.cpp       |  214 ++++++--
 src/log/replica.hpp       |   13 +-
 src/messages/log.proto    |   18 +-
 src/tests/log_tests.cpp   |  419 ++++++++------
 12 files changed, 1727 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 17fbf83..60fcb31 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -301,11 +301,14 @@ liblog_la_SOURCES =							\
   log/catchup.cpp							\
   log/consensus.cpp							\
   log/coordinator.cpp							\
+  log/log.cpp								\
+  log/recover.cpp							\
   log/replica.cpp
 liblog_la_SOURCES +=							\
   log/catchup.hpp							\
   log/consensus.hpp							\
   log/coordinator.hpp							\
+  log/recover.hpp							\
   log/replica.hpp							\
   log/log.hpp								\
   log/network.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 3b05751..fe6bf71 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -30,6 +30,7 @@
 
 #include "common/attributes.hpp"
 
+#include "messages/log.hpp"
 #include "messages/messages.hpp"
 
 // This file includes definitions for operators on protobuf classes
@@ -371,4 +372,28 @@ inline std::ostream& operator << (
 
 }} // namespace mesos { namespace internal {
 
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const Action::Type& type)
+{
+  return stream << Action::Type_Name(type);
+}
+
+
+inline std::ostream& operator << (
+    std::ostream& stream,
+    const Metadata::Status& status)
+{
+  return stream << Metadata::Status_Name(status);
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos
+
 #endif // __TYPE_UTILS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/coordinator.cpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp
index 21f2865..bc85e66 100644
--- a/src/log/coordinator.cpp
+++ b/src/log/coordinator.cpp
@@ -20,13 +20,13 @@
 
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
-#include <process/future.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
 
-#include <stout/error.hpp>
 #include <stout/none.hpp>
 
+#include "common/type_utils.hpp"
+
 #include "log/catchup.hpp"
 #include "log/consensus.hpp"
 #include "log/coordinator.hpp"
@@ -59,22 +59,10 @@ public:
 
   virtual ~CoordinatorProcess() {}
 
-  // Handles coordinator election. Returns the last committed log
-  // position if the operation succeeds. Returns none if the election
-  // is not successful, but can be retried.
+  // See comments in 'coordinator.hpp'.
   Future<Option<uint64_t> > elect();
-
-  // Handles coordinator demotion. Returns the last committed log
-  // position if the operation succeeds.
   Future<uint64_t> demote();
-
-  // Appends the specified bytes to the end of the log. Returns the
-  // position of the appended entry if the operation succeeds.
   Future<uint64_t> append(const string& bytes);
-
-  // Removes all log entries preceding the log entry at the given
-  // position (to). Returns the position at which the truncate
-  // operation is written if the operation succeeds.
   Future<uint64_t> truncate(uint64_t to);
 
 protected:
@@ -344,8 +332,7 @@ Future<uint64_t> CoordinatorProcess::truncate(uint64_t to)
 
 Future<uint64_t> CoordinatorProcess::write(const Action& action)
 {
-  LOG(INFO) << "Coordinator attempting to write "
-            << Action::Type_Name(action.type())
+  LOG(INFO) << "Coordinator attempting to write " << action.type()
             << " action at position " << action.position();
 
   CHECK_EQ(state, ELECTED);
@@ -455,111 +442,27 @@ Coordinator::~Coordinator()
 }
 
 
-Result<uint64_t> Coordinator::elect(const Timeout& timeout)
+Future<Option<uint64_t> > Coordinator::elect()
 {
-  LOG(INFO) << "Coordinator attempting to get elected within "
-            << timeout.remaining();
-
-  Future<Option<uint64_t> > electing =
-    dispatch(process, &CoordinatorProcess::elect);
-
-  electing.await(timeout.remaining());
-
-  CHECK(!electing.isDiscarded());
-
-  if (electing.isPending()) {
-    LOG(INFO) << "Coordinator timed out while trying to get elected";
-
-    electing.discard();
-    return None();
-  } else if (electing.isFailed()) {
-    LOG(ERROR) << "Coordinator failed to get elected: "
-               << electing.failure();
-
-    return Error(electing.failure());
-  } else {
-    if (electing.get().isNone()) {
-      LOG(INFO) << "Coordinator lost an election, but can be retried";
-
-      return None();
-    } else {
-      LOG(INFO) << "Coordinator elected with current position "
-                << electing.get().get();
-
-      return electing.get().get();
-    }
-  }
+  return dispatch(process, &CoordinatorProcess::elect);
 }
 
 
-Result<uint64_t> Coordinator::demote()
+Future<uint64_t> Coordinator::demote()
 {
-  Future<uint64_t> demoting =
-    dispatch(process, &CoordinatorProcess::demote);
-
-  demoting.await(); // TODO(jieyu): Use a timeout.
-
-  CHECK(!demoting.isDiscarded());
-
-  if (demoting.isFailed()) {
-    return Error(demoting.failure());
-  } else {
-    return demoting.get();
-  }
+  return dispatch(process, &CoordinatorProcess::demote);
 }
 
 
-Result<uint64_t> Coordinator::append(
-    const string& bytes,
-    const Timeout& timeout)
+Future<uint64_t> Coordinator::append(const string& bytes)
 {
-  Future<uint64_t> appending =
-    dispatch(process, &CoordinatorProcess::append, bytes);
-
-  appending.await(timeout.remaining());
-
-  CHECK(!appending.isDiscarded());
-
-  if (appending.isPending()) {
-    LOG(INFO) << "Coordinator timed out while trying to append";
-
-    appending.discard();
-    return None();
-  } else if (appending.isFailed()) {
-    LOG(ERROR) << "Coordinator failed to append the log: "
-               << appending.failure();
-
-    return Error(appending.failure());
-  } else {
-    return appending.get();
-  }
+  return dispatch(process, &CoordinatorProcess::append, bytes);
 }
 
 
-Result<uint64_t> Coordinator::truncate(
-    uint64_t to,
-    const Timeout& timeout)
+Future<uint64_t> Coordinator::truncate(uint64_t to)
 {
-  Future<uint64_t> truncating =
-    dispatch(process, &CoordinatorProcess::truncate, to);
-
-  truncating.await(timeout.remaining());
-
-  CHECK(!truncating.isDiscarded());
-
-  if (truncating.isPending()) {
-    LOG(INFO) << "Coordinator timed out while trying to truncate";
-
-    truncating.discard();
-    return None();
-  } else if (truncating.isFailed()) {
-    LOG(ERROR) << "Coordinator failed to truncate the log: "
-               << truncating.failure();
-
-    return Error(truncating.failure());
-  } else {
-    return truncating.get();
-  }
+  return dispatch(process, &CoordinatorProcess::truncate, to);
 }
 
 } // namespace log {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/coordinator.hpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.hpp b/src/log/coordinator.hpp
index 43cb530..35b68e9 100644
--- a/src/log/coordinator.hpp
+++ b/src/log/coordinator.hpp
@@ -23,10 +23,10 @@
 
 #include <string>
 
+#include <process/future.hpp>
 #include <process/shared.hpp>
-#include <process/timeout.hpp>
 
-#include <stout/result.hpp>
+#include <stout/option.hpp>
 
 #include "log/network.hpp"
 #include "log/replica.hpp"
@@ -49,25 +49,25 @@ public:
 
   ~Coordinator();
 
-  // Handles coordinator election/demotion. A result of none means the
-  // coordinator failed to achieve a quorum (e.g., due to timeout) but
-  // can be retried. A some result returns the last committed log
-  // position.
-  Result<uint64_t> elect(const process::Timeout& timeout);
-  Result<uint64_t> demote();
-
-  // Returns the result of trying to append the specified bytes. A
-  // result of none means the append failed (e.g., due to timeout),
-  // but can be retried.
-  Result<uint64_t> append(
-      const std::string& bytes,
-      const process::Timeout& timeout);
-
-  // Returns the result of trying to truncate the log (from the
-  // beginning to the specified position exclusive). A result of
-  // none means the truncate failed (e.g., due to timeout), but can be
-  // retried.
-  Result<uint64_t> truncate(uint64_t to, const process::Timeout& timeout);
+  // Handles coordinator election. Returns the last committed (a.k.a.,
+  // learned) log position if the operation succeeds. Returns none if
+  // the election is not successful, but can be retried.
+  process::Future<Option<uint64_t> > elect();
+
+  // Handles coordinator demotion. Returns the last committed (a.k.a.,
+  // learned) log position if the operation succeeds. One should only
+  // call this function if the coordinator has been elected, and no
+  // write (append or truncate) is in progress.
+  process::Future<uint64_t> demote();
+
+  // Appends the specified bytes to the end of the log. Returns the
+  // position of the appended entry if the operation succeeds.
+  process::Future<uint64_t> append(const std::string& bytes);
+
+  // Removes all log entries preceding the log entry at the given
+  // position (to). Returns the position at which the truncate
+  // operation is written if the operation succeeds.
+  process::Future<uint64_t> truncate(uint64_t to);
 
 private:
   CoordinatorProcess* process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/log.cpp
----------------------------------------------------------------------
diff --git a/src/log/log.cpp b/src/log/log.cpp
index d057925..e83f822 100644
--- a/src/log/log.cpp
+++ b/src/log/log.cpp
@@ -16,639 +16,893 @@
  * limitations under the License.
  */
 
-// TODO(benh): Optimize LearnedMessage (and the "commit" stage in
-// general) by figuring out a way to not send the entire action
-// contents a second time (should cut bandwidth used in half).
-
-// TODO(benh): Provide a LearnRequest that requests more than one
-// position at a time, and a LearnResponse that returns as many
-// positions as it knows.
-
-// TODO(benh): Implement background catchup: have a new replica that
-// comes online become part of the group but don't respond to promises
-// or writes until it has caught up! The advantage to becoming part of
-// the group is that the new replica can see where the end of the log
-// is in order to continue to catch up.
-
-// TODO(benh): Add tests that deliberatly put the system in a state of
-// inconsistency by doing funky things to the underlying logs. Figure
-// out ways of bringing new replicas online that seem to check the
-// consistency of the other replicas.
-
-#include <list>
-#include <map>
-#include <set>
-#include <string>
-#include <vector>
-
-#include <boost/lexical_cast.hpp>
-
+#include <process/defer.hpp>
 #include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
 #include <process/process.hpp>
-#include <process/run.hpp>
+#include <process/shared.hpp>
 
 #include <stout/check.hpp>
-#include <stout/duration.hpp>
-#include <stout/fatal.hpp>
+#include <stout/error.hpp>
 #include <stout/foreach.hpp>
-#include <stout/os.hpp>
-#include <stout/result.hpp>
-
-#include "zookeeper/zookeeper.hpp"
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/set.hpp>
 
 #include "log/coordinator.hpp"
+#include "log/log.hpp"
+#include "log/network.hpp"
+#include "log/recover.hpp"
 #include "log/replica.hpp"
 
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::log;
-
 using namespace process;
 
 using std::list;
-using std::map;
-using std::pair;
 using std::set;
 using std::string;
-using std::vector;
 
+namespace mesos {
+namespace internal {
+namespace log {
 
-// class Drop : public Filter
-// {
-// public:
-//   Drop
-//   virtual bool filter(Message* message)
-//   {
-//     return  == message->name;
-//   }
-// };
+class LogProcess : public Process<LogProcess>
+{
+public:
+  LogProcess(
+      size_t _quorum,
+      const string& path,
+      const set<UPID>& pids);
+
+  LogProcess(
+      size_t _quorum,
+      const string& path,
+      const string& servers,
+      const Duration& timeout,
+      const string& znode,
+      const Option<zookeeper::Authentication>& auth);
+
+  // Recovers the log by catching up if needed. Returns a shared
+  // pointer to the local replica if the recovery succeeds.
+  Future<Shared<Replica> > recover();
 
+protected:
+  virtual void initialize();
+  virtual void finalize();
 
-// class PeriodicFilter
+private:
+  friend class LogReaderProcess;
+  friend class LogWriterProcess;
 
+  // Continuations.
+  void _recover();
 
-char** args; // Command line arguments for doing a restart.
+  // TODO(benh): Factor this out into "membership renewer".
+  void watch(
+      const UPID& pid,
+      const set<zookeeper::Group::Membership>& memberships);
 
+  void failed(const string& message);
+  void discarded();
 
-void restart()
-{
-  LOG(INFO) << "Restarting ...";
-  execv(args[0], args);
-  fatalerror("Failed to exec");
-}
+  const size_t quorum;
+  Shared<Replica> replica;
+  Shared<Network> network;
 
+  // For replica recovery.
+  Option<Future<Owned<Replica> > > recovering;
+  process::Promise<Nothing> recovered;
+  list<process::Promise<Shared<Replica> >*> promises;
 
-bool coordinate(Coordinator* coordinator,
-                uint64_t id,
-                int end,
-                map<int, int> truncations)
+  // For renewing membership. We store a Group instance in order to
+  // continually renew the replicas membership (when using ZooKeeper).
+  zookeeper::Group* group;
+  Future<zookeeper::Group::Membership> membership;
+};
+
+
+class LogReaderProcess : public Process<LogReaderProcess>
 {
-  const int attempts = 3;
+public:
+  LogReaderProcess(Log* log);
 
-  uint64_t index;
+  Future<Log::Position> beginning();
+  Future<Log::Position> ending();
 
-  int attempt = 1;
-  while (true) {
-    Result<uint64_t> result = coordinator->elect(id);
-    if (result.isError()) {
-      restart();
-    } else if (result.isNone()) {
-      if (attempt == attempts) {
-        restart();
-      } else {
-        attempt++;
-        os::sleep(Seconds(1));
-      }
-    } else {
-      CHECK_SOME(result);
-      index = result.get();
-      break;
-    }
-  }
+  Future<list<Log::Entry> > read(
+      const Log::Position& from,
+      const Log::Position& to);
 
-  uint64_t value = 0;
-
-  if (index != 0) {
-    attempt = 1;
-    while (true) {
-      Result<list<pair<uint64_t, string> > > result =
-        coordinator->read(index, index);
-      if (result.isError()) {
-        LOG(INFO) << "Restarting due to read error";
-        restart();
-      } else if (result.isNone()) {
-        if (attempt == attempts) {
-          LOG(INFO) << "Restarting after too many attempts";
-          restart();
-        } else {
-          attempt++;
-          os::sleep(Seconds(1));
-        }
-      } else {
-        CHECK_SOME(result);
-        const list<pair<uint64_t, string> >& list = result.get();
-        if (list.size() != 1) {
-          index--;
-        } else {
-          try {
-            value = boost::lexical_cast<uint64_t>(list.front().second);
-          } catch (boost::bad_lexical_cast&) {
-            LOG(INFO) << "Restarting due to conversion error";
-            restart();
-          }
-          break;
-        }
-      }
-    }
-  }
+protected:
+  virtual void initialize();
+  virtual void finalize();
 
-  value++;
-
-  srand(time(NULL));
-
-  int writes = rand() % 500;
-
-  LOG(INFO) << "Attempting to do " << writes << " writes";
-
-  attempt = 1;
-  while (writes > 0 && value <= end) {
-    if (truncations.count(value) > 0) {
-      int to = truncations[value];
-      Result<uint64_t> result = coordinator->truncate(to);
-      if (result.isError()) {
-        LOG(INFO) << "Restarting due to truncate error";
-        restart();
-      } else if (result.isNone()) {
-        if (attempt == attempts) {
-          LOG(INFO) << "Restarting after too many attempts";
-          restart();
-        } else {
-          attempt++;
-          os::sleep(Seconds(1));
-          continue;
-        }
-      } else {
-        CHECK_SOME(result);
-        LOG(INFO) << "Truncated to " << to;
-        os::sleep(Seconds(1));
-        attempt = 1;
-      }
-    }
+private:
+  // Returns a position from a raw value.
+  static Log::Position position(uint64_t value);
 
-    Result<uint64_t> result = coordinator->append(stringify(value));
-    if (result.isError()) {
-      LOG(INFO) << "Restarting due to append error";
-      restart();
-    } else if (result.isNone()) {
-      if (attempt == attempts) {
-        LOG(INFO) << "Restarting after too many attempts";
-        restart();
-      } else {
-        attempt++;
-        os::sleep(Seconds(1));
-      }
-    } else {
-      CHECK_SOME(result);
-      LOG(INFO) << "Wrote " << value;
-      os::sleep(Seconds(1));
-      writes--;
-      value++;
-      attempt = 1;
-    }
-  }
+  // Returns a future which gets set when the log recovery has
+  // finished (either succeeded or failed).
+  Future<Nothing> recover();
 
-  exit(0);
-  return true;
-}
+  // Continuations.
+  void _recover();
 
+  Future<Log::Position> _beginning();
+  Future<Log::Position> _ending();
 
-class LogProcess : public Process<LogProcess>
+  Future<list<Log::Entry> > _read(
+      const Log::Position& from,
+      const Log::Position& to);
+
+  Future<list<Log::Entry> > __read(
+      const Log::Position& from,
+      const Log::Position& to,
+      const list<Action>& actions);
+
+  Future<Shared<Replica> > recovering;
+  list<process::Promise<Nothing>*> promises;
+};
+
+
+class LogWriterProcess : public Process<LogWriterProcess>
 {
 public:
-  LogProcess(int _quorum,
-             const string& _file,
-             const string& _servers,
-             const string& _znode,
-             int _end,
-             const map<int, int>& _truncations);
-
-  virtual ~LogProcess();
-
-  // ZooKeeper events. TODO(*): Use a ZooKeeper listener?
-  void connected();
-  void reconnecting();
-  void reconnected();
-  void expired();
-  void updated(const string& path);
+  LogWriterProcess(Log* log);
+
+  Future<Option<Log::Position> > elect();
+  Future<Log::Position> append(const string& bytes);
+  Future<Log::Position> truncate(const Log::Position& to);
 
 protected:
-  virtual void initialze();
+  virtual void initialize();
+  virtual void finalize();
 
 private:
-  // Updates the group.
-  void regroup();
+  // Returns a position from a raw value.
+  static Log::Position position(uint64_t value);
 
-  // Runs an election.
-  void elect();
+  // Returns a future which gets set when the log recovery has
+  // finished (either succeeded or failed).
+  Future<Nothing> recover();
 
-  // ZooKeeper bits and pieces.
-  string servers;
-  string znode;
-  ZooKeeper* zk;
-  Watcher* watcher;
+  // Continuations.
+  void _recover();
 
-  // Size of quorum.
-  int quorum;
+  Future<Option<Log::Position> > _elect();
+  Option<Log::Position> __elect(const Option<uint64_t>& result);
 
-  // Log file.
-  string file;
+  void failed(const string& message);
 
-  // Termination value (when to stop writing to the log).
-  int end;
+  const size_t quorum;
+  const Shared<Network> network;
 
-  // Truncation points.
-  map<int, int> truncations;
+  Future<Shared<Replica> > recovering;
+  list<process::Promise<Nothing>*> promises;
 
-  // Coordinator id.
-  uint64_t id;
+  Coordinator* coordinator;
+  Option<string> error;
+};
 
-  // Whether or not the coordinator has been elected.
-  bool elected;
 
-  // Group members.
-  set<UPID> members;
+/////////////////////////////////////////////////
+// Implementation of LogProcess.
+/////////////////////////////////////////////////
 
-  ReplicaProcess* replica;
-  GroupProcess* group;
-  Coordinator* coordinator;
-};
+
+LogProcess::LogProcess(
+    size_t _quorum,
+    const string& path,
+    const set<UPID>& pids)
+  : ProcessBase(ID::generate("log")),
+    quorum(_quorum),
+    replica(new Replica(path)),
+    network(new Network(pids + (UPID) replica->pid())),
+    group(NULL) {}
+
+
+LogProcess::LogProcess(
+    size_t _quorum,
+    const string& path,
+    const string& servers,
+    const Duration& timeout,
+    const string& znode,
+    const Option<zookeeper::Authentication>& auth)
+  : ProcessBase(ID::generate("log")),
+    quorum(_quorum),
+    replica(new Replica(path)),
+    network(new ZooKeeperNetwork(servers, timeout, znode, auth)),
+    group(new zookeeper::Group(servers, timeout, znode, auth)) {}
 
 
-class LogProcessWatcher : public Watcher
+void LogProcess::initialize()
 {
-public:
-  LogProcessWatcher(const PID<LogProcess>& _pid)
-    : pid(_pid), reconnect(false) {}
-
-  virtual ~LogProcessWatcher() {}
-
-  virtual void process(ZooKeeper* zk, int type, int state, const string& path)
-  {
-    if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
-      // Check if this is a reconnect.
-      if (!reconnect) {
-        // Initial connect.
-        dispatch(pid, &LogProcess::connected);
-      } else {
-        // Reconnected.
-        dispatch(pid, &LogProcess::reconnected);
-      }
-    } else if ((state == ZOO_CONNECTING_STATE) &&
-               (type == ZOO_SESSION_EVENT)) {
-      // The client library automatically reconnects, taking into
-      // account failed servers in the connection string,
-      // appropriately handling the "herd effect", etc.
-      reconnect = true;
-      dispatch(pid, &LogProcess::reconnecting);
-    } else if ((state == ZOO_EXPIRED_SESSION_STATE) &&
-               (type == ZOO_SESSION_EVENT)) {
-      dispatch(pid, &LogProcess::expired);
-
-      // If this watcher is reused, the next connect won't be a reconnect.
-      reconnect = false;
-    } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
-      dispatch(pid, &LogProcess::updated, path);
-    } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
-      dispatch(pid, &LogProcess::updated, path);
-    } else {
-      LOG(FATAL) << "Unimplemented ZooKeeper event: (state is "
-                 << state << " and type is " << type << ")";
-    }
+  if (group != NULL) {
+    // Need to add our replica to the ZooKeeper group!
+    LOG(INFO) << "Attempting to join replica to ZooKeeper group";
+
+    membership = group->join(replica->pid())
+      .onFailed(defer(self(), &Self::failed, lambda::_1))
+      .onDiscarded(defer(self(), &Self::discarded));
+
+    // We save and pass the pid of the replica to the 'watch' function
+    // because the field member 'replica' is not available during
+    // recovery. We need the pid to renew the replicas membership.
+    group->watch()
+      .onReady(defer(self(), &Self::watch, replica->pid(), lambda::_1))
+      .onFailed(defer(self(), &Self::failed, lambda::_1))
+      .onDiscarded(defer(self(), &Self::discarded));
   }
 
-private:
-  const PID<LogProcess> pid;
-  bool reconnect;
-};
+  // Start the recovery.
+  recover();
+}
+
+
+void LogProcess::finalize()
+{
+  if (recovering.isSome()) {
+    // Stop the recovery if it is still pending.
+    recovering.get().discard();
+  }
 
+  // If there exist operations that are gated by the recovery, we fail
+  // all of them because the log is being deleted.
+  foreach (process::Promise<Shared<Replica> >* promise, promises) {
+    promise->fail("Log is being deleted");
+    delete promise;
+  }
+  promises.clear();
 
-LogProcess::LogProcess(int _quorum,
-                       const string& _file,
-                       const string& _servers,
-                       const string& _znode,
-                       int _end,
-                       const map<int, int>& _truncations)
-  : quorum(_quorum),
-    file(_file),
-    servers(_servers),
-    znode(_znode),
-    end(_end),
-    truncations(_truncations),
-    id(0),
-    elected(false),
-    replica(NULL),
-    group(NULL),
-    coordinator(NULL) {}
-
-
-LogProcess::~LogProcess()
-{
-  delete zk;
-  delete watcher;
-  delete replica;
   delete group;
-  delete coordinator;
+
+  // Wait for the shared pointers 'network' and 'replica' to become
+  // unique (i.e., no other reference to them). These calls should not
+  // be blocking for too long because at this moment, all operations
+  // should have been cancelled or are being cancelled. We do this
+  // because we want to make sure that after the log is deleted, all
+  // operations associated with this log are terminated.
+  network.own().await();
+  replica.own().await();
 }
 
 
-void LogProcess::connected()
+Future<Shared<Replica> > LogProcess::recover()
 {
-  LOG(INFO) << "Log connected to ZooKeeper";
+  // The future 'recovered' is used to mark the success (or the
+  // failure) of the recovery. We do not use the future 'recovering'
+  // to do that because it can be set in other process and thus has a
+  // race condition which we want to avoid. We deliberately do not
+  // save replica in 'recovered' because it will complicate our
+  // deleting logic (see 'finalize').
+  Future<Nothing> future = recovered.future();
+
+  if (future.isDiscarded()) {
+    return Failure("Not expecting discarded future");
+  } else if (future.isFailed()) {
+    return Failure(future.failure());
+  } else if (future.isReady()) {
+    return replica;
+  }
+
+  // Recovery has not finished yet. Create a promise and queue it such
+  // that it can get notified once the recovery has finished (either
+  // succeeded or failed).
+  process::Promise<Shared<Replica> >* promise =
+    new process::Promise<Shared<Replica> >();
+
+  promises.push_back(promise);
+
+  if (recovering.isNone()) {
+    // TODO(jieyu): At this moment, we haven't shared 'replica' to
+    // others yet. Therefore, the following 'replica.own()' call
+    // should not be blocking. In the future, we may wanna support
+    // 'release' in Shared which will provide this CHECK internally.
+    CHECK(replica.unique());
+
+    recovering = log::recover(quorum, replica.own().get(), network)
+      .onAny(defer(self(), &Self::_recover));
+  }
+
+  return promise->future();
+}
+
 
-  int ret;
-  string result;
+void LogProcess::_recover()
+{
+  CHECK_SOME(recovering);
 
-  // Assume the znode that was created does not end with a "/".
-  CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
+  Future<Owned<Replica> > future = recovering.get();
 
-  // Create directory path znodes as necessary.
-  size_t index = znode.find("/", 0);
+  if (!future.isReady()) {
+    // The 'future' here can only be discarded in 'finalize'.
+    string failure = future.isFailed() ?
+      future.failure() :
+      "The future 'recovering' is unexpectedly discarded";
 
-  while (index < string::npos) {
-    // Get out the prefix to create.
-    index = znode.find("/", index + 1);
-    string prefix = znode.substr(0, index);
+    // Mark the failure of the recovery.
+    recovered.fail(failure);
 
-    LOG(INFO) << "Log trying to create znode '"
-              << prefix << "' in ZooKeeper";
+    foreach (process::Promise<Shared<Replica> >* promise, promises) {
+      promise->fail(failure);
+      delete promise;
+    }
+    promises.clear();
+  } else {
+    replica = future.get().share();
 
-    // Create the node (even if it already exists).
-    ret = zk->create(
-        prefix,
-        "",
-        ZOO_OPEN_ACL_UNSAFE,
-        // ZOO_CREATOR_ALL_ACL, // needs authentication
-        0,
-        &result);
+    // Mark the success of the recovery.
+    recovered.set(Nothing());
 
-    if (ret != ZOK && ret != ZNODEEXISTS) {
-      LOG(FATAL) << "Failed to create '" << prefix
-                 << "' in ZooKeeper: " << zk->message(ret);
+    foreach (process::Promise<Shared<Replica> >* promise, promises) {
+      promise->set(replica);
+      delete promise;
     }
+    promises.clear();
   }
+}
 
-  // Now create the "replicas" znode.
-  LOG(INFO) << "Log trying to create znode '" << znode
-            << "/replicas" << "' in ZooKeeper";
 
-  // Create the node (even if it already exists).
-  ret = zk->create(znode + "/replicas", "", ZOO_OPEN_ACL_UNSAFE,
-                   // ZOO_CREATOR_ALL_ACL, // needs authentication
-                   0, &result);
+void LogProcess::watch(
+    const UPID& pid,
+    const set<zookeeper::Group::Membership>& memberships)
+{
+  if (membership.isReady() && memberships.count(membership.get()) == 0) {
+    // Our replica's membership must have expired, join back up.
+    LOG(INFO) << "Renewing replica group membership";
 
-  if (ret != ZOK && ret != ZNODEEXISTS) {
-    LOG(FATAL) << "Failed to create '" << znode << "/replicas"
-               << "' in ZooKeeper: " << zk->message(ret);
+    membership = group->join(pid)
+      .onFailed(defer(self(), &Self::failed, lambda::_1))
+      .onDiscarded(defer(self(), &Self::discarded));
   }
 
-  // Now create the "coordinators" znode.
-  LOG(INFO) << "Log trying to create znode '" << znode
-            << "/coordinators" << "' in ZooKeeper";
+  group->watch(memberships)
+    .onReady(defer(self(), &Self::watch, pid, lambda::_1))
+    .onFailed(defer(self(), &Self::failed, lambda::_1))
+    .onDiscarded(defer(self(), &Self::discarded));
+}
 
-  // Create the node (even if it already exists).
-  ret = zk->create(znode + "/coordinators", "", ZOO_OPEN_ACL_UNSAFE,
-                   // ZOO_CREATOR_ALL_ACL, // needs authentication
-                   0, &result);
 
-  if (ret != ZOK && ret != ZNODEEXISTS) {
-    LOG(FATAL) << "Failed to create '" << znode << "/coordinators"
-               << "' in ZooKeeper: " << zk->message(ret);
-  }
+void LogProcess::failed(const string& message)
+{
+  LOG(FATAL) << "Failed to participate in ZooKeeper group: " << message;
+}
 
-  // Okay, create our replica, group, and coordinator.
-  replica = new ReplicaProcess(file);
-  spawn(replica);
 
-  group = new GroupProcess();
-  spawn(group);
+void LogProcess::discarded()
+{
+  LOG(FATAL) << "Not expecting future to get discarded!";
+}
 
-  coordinator = new Coordinator(quorum, replica, group);
 
-  // Set a watch on the replicas.
-  ret = zk->getChildren(znode + "/replicas", true, NULL);
+/////////////////////////////////////////////////
+// Implementation of LogReaderProcess.
+/////////////////////////////////////////////////
 
-  if (ret != ZOK) {
-    LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
-               << "' in ZooKeeper: " << zk->message(ret);
-  }
 
-  // Set a watch on the coordinators.
-  ret = zk->getChildren(znode + "/coordinators", true, NULL);
+LogReaderProcess::LogReaderProcess(Log* log)
+  : ProcessBase(ID::generate("log-reader")),
+    recovering(dispatch(log->process, &LogProcess::recover)) {}
 
-  if (ret != ZOK) {
-    LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
-               << "' in ZooKeeper: " << zk->message(ret);
-  }
 
-  // Add an ephemeral znode for our replica and coordinator.
-  ret = zk->create(znode + "/replicas/", replica->self(), ZOO_OPEN_ACL_UNSAFE,
-                   // ZOO_CREATOR_ALL_ACL, // needs authentication
-                   ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+void LogReaderProcess::initialize()
+{
+  recovering.onAny(defer(self(), &Self::_recover));
+}
 
-  if (ret != ZOK) {
-    LOG(FATAL) << "Failed to create an ephmeral node at '" << znode
-               << "/replica/" << "' in ZooKeeper: " << zk->message(ret);
+
+void LogReaderProcess::finalize()
+{
+  foreach (process::Promise<Nothing>* promise, promises) {
+    promise->fail("Log reader is being deleted");
+    delete promise;
   }
+  promises.clear();
+}
 
-  ret = zk->create(znode + "/coordinators/", "", ZOO_OPEN_ACL_UNSAFE,
-                   // ZOO_CREATOR_ALL_ACL, // needs authentication
-                   ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
 
-  if (ret != ZOK) {
-    LOG(FATAL) << "Failed to create an ephmeral node at '" << znode
-               << "/replica/" << "' in ZooKeeper: " << zk->message(ret);
+Future<Nothing> LogReaderProcess::recover()
+{
+  if (recovering.isReady()) {
+    return Nothing();
+  } else if (recovering.isFailed()) {
+    return Failure(recovering.failure());
+  } else if (recovering.isDiscarded()) {
+    return Failure("The future 'recovering' is unexpectedly discarded");
   }
 
-  // Save the sequence id but only grab the basename, e.g.,
-  // "/path/to/znode/000000131" => "000000131".
-  result = utils::os::basename(result);
+  // At this moment, the future 'recovering' should most likely be
+  // pending. But it is also likely that it gets set after the above
+  // checks. Either way, we know that the continuation '_recover' has
+  // not been called yet (otherwise, we should not be able to reach
+  // here). The promise we are creating below will be properly
+  // set/failed when '_recover' is called.
+  process::Promise<Nothing>* promise = new process::Promise<Nothing>();
+  promises.push_back(promise);
+  return promise->future();
+}
+
 
-  try {
-    id = boost::lexical_cast<uint64_t>(result);
-  } catch (boost::bad_lexical_cast&) {
-    LOG(FATAL) << "Failed to convert '" << result << "' into an integer";
+void LogReaderProcess::_recover()
+{
+  if (!recovering.isReady()) {
+    foreach (process::Promise<Nothing>* promise, promises) {
+      promise->fail(
+          recovering.isFailed() ?
+          recovering.failure() :
+          "The future 'recovering' is unexpectedly discarded");
+      delete promise;
+    }
+    promises.clear();
+  } else {
+    foreach (process::Promise<Nothing>* promise, promises) {
+      promise->set(Nothing());
+      delete promise;
+    }
+    promises.clear();
   }
+}
+
 
-  // Run an election!
-  elect();
+Future<Log::Position> LogReaderProcess::beginning()
+{
+  return recover().then(defer(self(), &Self::_beginning));
 }
 
 
-void LogProcess::reconnecting()
+Future<Log::Position> LogReaderProcess::_beginning()
 {
-  LOG(INFO) << "Reconnecting to ZooKeeper";
+  CHECK(recovering.isReady());
+
+  return recovering.get()->beginning()
+    .then(lambda::bind(&Self::position, lambda::_1));
 }
 
 
-void LogProcess::reconnected()
+Future<Log::Position> LogReaderProcess::ending()
 {
-  LOG(INFO) << "Reconnected to ZooKeeper";
+  return recover().then(defer(self(), &Self::_ending));
 }
 
 
-void LogProcess::expired()
+Future<Log::Position> LogReaderProcess::_ending()
 {
-  restart();
+  CHECK(recovering.isReady());
+
+  return recovering.get()->ending()
+    .then(lambda::bind(&Self::position, lambda::_1));
 }
 
 
-void LogProcess::updated(const string& path)
+Future<list<Log::Entry> > LogReaderProcess::read(
+    const Log::Position& from,
+    const Log::Position& to)
 {
-  if (znode + "/replicas" == path) {
+  return recover().then(defer(self(), &Self::_read, from, to));
+}
 
-    regroup();
 
-    // Reset a watch on the replicas.
-    int ret = zk->getChildren(znode + "/replicas", true, NULL);
+Future<list<Log::Entry> > LogReaderProcess::_read(
+    const Log::Position& from,
+    const Log::Position& to)
+{
+  CHECK(recovering.isReady());
 
-    if (ret != ZOK) {
-      LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
-                 << "' in ZooKeeper: " << zk->message(ret);
-    }
-  } else {
-    CHECK(znode + "/coordinators" == path);
+  return recovering.get()->read(from.value, to.value)
+    .then(defer(self(), &Self::__read, from, to, lambda::_1));
+}
 
-    elect();
 
-    // Reset a watch on the coordinators.
-    int ret = zk->getChildren(znode + "/coordinators", true, NULL);
+Future<list<Log::Entry> > LogReaderProcess::__read(
+    const Log::Position& from,
+    const Log::Position& to,
+    const list<Action>& actions)
+{
+  list<Log::Entry> entries;
+
+  uint64_t position = from.value;
+
+  foreach (const Action& action, actions) {
+    // Ensure read range is valid.
+    if (!action.has_performed() ||
+        !action.has_learned() ||
+        !action.learned()) {
+      return Failure("Bad read range (includes pending entries)");
+    } else if (position++ != action.position()) {
+      return Failure("Bad read range (includes missing entries)");
+    }
 
-    if (ret != ZOK) {
-      LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
-                 << "' in ZooKeeper: " << zk->message(ret);
+    // And only return appends.
+    CHECK(action.has_type());
+    if (action.type() == Action::APPEND) {
+      entries.push_back(Log::Entry(action.position(), action.append().bytes()));
     }
   }
+
+  return entries;
 }
 
 
-void LogProcess::initalize()
+Log::Position LogReaderProcess::position(uint64_t value)
 {
-  // TODO(benh): Real testing requires injecting a ZooKeeper instance.
-  watcher = new LogProcessWatcher(self());
-  zk = new ZooKeeper(servers, 10000, watcher);
+  return Log::Position(value);
 }
 
 
-void LogProcess::regroup()
+/////////////////////////////////////////////////
+// Implementation of LogWriterProcess.
+/////////////////////////////////////////////////
+
+
+LogWriterProcess::LogWriterProcess(Log* log)
+  : ProcessBase(ID::generate("log-writer")),
+    quorum(log->process->quorum),
+    network(log->process->network),
+    recovering(dispatch(log->process, &LogProcess::recover)),
+    coordinator(NULL),
+    error(None()) {}
+
+
+void LogWriterProcess::initialize()
 {
-  vector<string> results;
+  recovering.onAny(defer(self(), &Self::_recover));
+}
 
-  int ret = zk->getChildren(znode + "/replicas", false, &results);
 
-  if (ret != ZOK) {
-    LOG(FATAL) << "Failed to get children of '" << znode << "/replicas"
-               << "' in ZooKeeper: " << zk->message(ret);
+void LogWriterProcess::finalize()
+{
+  foreach (process::Promise<Nothing>* promise, promises) {
+    promise->fail("Log writer is being deleted");
+    delete promise;
   }
+  promises.clear();
+
+  delete coordinator;
+}
 
-  set<UPID> current;
-  set<UPID> added;
-  set<UPID> removed;
 
-  foreach (const string& result, results) {
-    string s;
-    int ret = zk->get(znode + "/replicas/" + result, false, &s, NULL);
-    UPID pid = s;
-    current.insert(pid);
+Future<Nothing> LogWriterProcess::recover()
+{
+  if (recovering.isReady()) {
+    return Nothing();
+  } else if (recovering.isFailed()) {
+    return Failure(recovering.failure());
+  } else if (recovering.isDiscarded()) {
+    return Failure("The future 'recovering' is unexpectedly discarded");
   }
 
-  foreach (const UPID& pid, current) {
-    if (members.count(pid) == 0) {
-      added.insert(pid);
+  // At this moment, the future 'recovering' should most likely be
+  // pending. But it is also likely that it gets set after the above
+  // checks. Either way, we know that the continuation '_recover' has
+  // not been called yet (otherwise, we should not be able to reach
+  // here). The promise we are creating below will be properly
+  // set/failed when '_recover' is called.
+  process::Promise<Nothing>* promise = new process::Promise<Nothing>();
+  promises.push_back(promise);
+  return promise->future();
+}
+
+
+void LogWriterProcess::_recover()
+{
+  if (!recovering.isReady()) {
+    foreach (process::Promise<Nothing>* promise, promises) {
+      promise->fail(
+          recovering.isFailed() ?
+          recovering.failure() :
+          "The future 'recovering' is unexpectedly discarded");
+      delete promise;
+    }
+    promises.clear();
+  } else {
+    foreach (process::Promise<Nothing>* promise, promises) {
+      promise->set(Nothing());
+      delete promise;
     }
+    promises.clear();
   }
+}
 
-  foreach (const UPID& pid, members) {
-    if (current.count(pid) == 0) {
-      removed.insert(pid);
-    }
+
+Future<Option<Log::Position> > LogWriterProcess::elect()
+{
+  return recover().then(defer(self(), &Self::_elect));
+}
+
+
+Future<Option<Log::Position> > LogWriterProcess::_elect()
+{
+  // We delete the existing coordinator (if exists) and create a new
+  // coordinator each time 'elect' is called.
+  delete coordinator;
+  error = None();
+
+  CHECK(recovering.isReady());
+
+  coordinator = new Coordinator(quorum, recovering.get(), network);
+
+  return coordinator->elect()
+    .then(defer(self(), &Self::__elect, lambda::_1))
+    .onFailed(defer(self(), &Self::failed, lambda::_1));
+}
+
+
+Option<Log::Position> LogWriterProcess::__elect(const Option<uint64_t>& result)
+{
+  if (result.isNone()) {
+    return None();
+  } else {
+    return position(result.get());
+  }
+}
+
+
+Future<Log::Position> LogWriterProcess::append(const string& bytes)
+{
+  if (coordinator == NULL) {
+    return Failure("No election has been performed");
   }
 
-  foreach (const UPID& pid, added) {
-    dispatch(group, &GroupProcess::add, pid);
-    members.insert(pid);
+  if (error.isSome()) {
+    return Failure(error.get());
   }
 
-  foreach (const UPID& pid, removed) {
-    dispatch(group, &GroupProcess::remove, pid);
-    members.erase(pid);
+  return coordinator->append(bytes)
+    .then(lambda::bind(&Self::position, lambda::_1))
+    .onFailed(defer(self(), &Self::failed, lambda::_1));
+}
+
+
+Future<Log::Position> LogWriterProcess::truncate(const Log::Position& to)
+{
+  if (coordinator == NULL) {
+    return Failure("No election has been performed");
   }
+
+  if (error.isSome()) {
+    return Failure(error.get());
+  }
+
+  return coordinator->truncate(to.value)
+    .then(lambda::bind(&Self::position, lambda::_1))
+    .onFailed(defer(self(), &Self::failed, lambda::_1));
 }
 
 
-void LogProcess::elect()
+void LogWriterProcess::failed(const string& message)
 {
-  vector<string> results;
+  error = message;
+}
 
-  int ret = zk->getChildren(znode + "/coordinators", false, &results);
 
-  if (ret != ZOK) {
-    LOG(FATAL) << "Failed to get children of '" << znode << "/coordinators"
-               << "' in ZooKeeper: " << zk->message(ret);
-  }
+Log::Position LogWriterProcess::position(uint64_t value)
+{
+  return Log::Position(value);
+}
+
+
+/////////////////////////////////////////////////
+// Public interfaces for Log.
+/////////////////////////////////////////////////
+
+
+Log::Log(
+    int quorum,
+    const string& path,
+    const set<UPID>& pids)
+{
+  GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+  process = new LogProcess(quorum, path, pids);
+  spawn(process);
+}
+
+Log::Log(
+    int quorum,
+    const string& path,
+    const string& servers,
+    const Duration& timeout,
+    const string& znode,
+    const Option<zookeeper::Authentication>& auth)
+{
+  GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+  process = new LogProcess(quorum, path, servers, timeout, znode, auth);
+  spawn(process);
+}
+
+
+Log::~Log()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
 
-  // "Elect" the minimum ephemeral znode.
-  uint64_t min = LONG_MAX;
-  foreach (const string& result, results) {
-    try {
-      min = std::min(min, boost::lexical_cast<uint64_t>(result));
-    } catch (boost::bad_lexical_cast&) {
-      LOG(FATAL) << "Failed to convert '" << result << "' into an integer";
+/////////////////////////////////////////////////
+// Public interfaces for Log::Reader.
+/////////////////////////////////////////////////
+
+
+Log::Reader::Reader(Log* log)
+{
+  process = new LogReaderProcess(log);
+  spawn(process);
+}
+
+
+Log::Reader::~Reader()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Result<list<Log::Entry> > Log::Reader::read(
+    const Log::Position& from,
+    const Log::Position& to,
+    const Timeout& timeout)
+{
+  Future<list<Log::Entry> > future =
+    dispatch(process, &LogReaderProcess::read, from, to);
+
+  if (!future.await(timeout.remaining())) {
+    LOG(INFO) << "Timed out while trying to read the log";
+
+    future.discard();
+    return None();
+  } else {
+    if (!future.isReady()) {
+      string failure =
+        future.isFailed() ?
+        future.failure() :
+        "Not expecting discarded future";
+
+      LOG(ERROR) << "Failed to read the log: " << failure;
+
+      return Error(failure);
+    } else {
+      return future.get();
     }
   }
+}
 
-  if (id == min && !elected) {
-    elected = true;
-    process::run(&coordinate, coordinator, id, end, truncations);
-  } else if (elected) {
-    LOG(INFO) << "Restarting due to demoted";
-    restart();
-  }
+
+Log::Position Log::Reader::beginning()
+{
+  // TODO(benh): Take a timeout and return an Option.
+  return dispatch(process, &LogReaderProcess::beginning).get();
 }
 
 
-int main(int argc, char** argv)
+Log::Position Log::Reader::ending()
 {
-  if (argc < 6) {
-    fatal("Usage: %s <quorum> <file> <servers> <znode> <end> <at> <to> ...",
-          argv[0]);
-  }
+  // TODO(benh): Take a timeout and return an Option.
+  return dispatch(process, &LogReaderProcess::ending).get();
+}
 
-  args = argv;
 
-  int quorum = atoi(argv[1]);
-  string file = argv[2];
-  string servers = argv[3];
-  string znode = argv[4];
-  int end = atoi(argv[5]);
+/////////////////////////////////////////////////
+// Public interfaces for Log::Writer.
+/////////////////////////////////////////////////
 
-  map<int, int> truncations;
 
-  for (int i = 6; argv[i] != NULL; i += 2) {
-    if (argv[i + 1] == NULL) {
-      fatal("Expecting 'to' argument for truncation");
+Log::Writer::Writer(Log* log, const Duration& timeout, int retries)
+{
+  process = new LogWriterProcess(log);
+  spawn(process);
+
+  // Trying to get elected.
+  for (;;) {
+    LOG(INFO) << "Attempting to get elected within " << timeout;
+
+    Future<Option<Log::Position> > future =
+      dispatch(process, &LogWriterProcess::elect);
+
+    if (!future.await(timeout)) {
+      LOG(INFO) << "Timed out while trying to get elected";
+
+      // Cancel the election. It is likely that the election is done
+      // right after the timeout has been reached. In that case, we
+      // may unnecessarily rerun the election, but it is safe.
+      future.discard();
+    } else {
+      if (!future.isReady()) {
+        string failure =
+          future.isFailed() ?
+          future.failure() :
+          "Not expecting discarded future";
+
+        LOG(ERROR) << "Failed to get elected: " << failure;
+        break;
+      } else if (future.get().isNone()) {
+        LOG(INFO) << "Lost an election, but can be retried";
+      } else {
+        LOG(INFO) << "Elected with current position "
+                  << future.get().get().value;
+        return;
+      }
+    }
+
+    if (--retries < 0) {
+      LOG(ERROR) << "Retry limit has been reached during election";
+      break;
     }
+  }
+}
+
+
+Log::Writer::~Writer()
+{
+  terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+Result<Log::Position> Log::Writer::append(
+    const string& data,
+    const Timeout& timeout)
+{
+  LOG(INFO) << "Attempting to append " << data.size() << " bytes to the log";
+
+  Future<Log::Position> future =
+    dispatch(process, &LogWriterProcess::append, data);
+
+  if (!future.await(timeout.remaining())) {
+    LOG(INFO) << "Timed out while trying to append the log";
 
-    int at = atoi(argv[i]);
-    int to = atoi(argv[i + 1]);
+    future.discard();
+    return None();
+  } else {
+    if (!future.isReady()) {
+      string failure =
+        future.isFailed() ?
+        future.failure() :
+        "Not expecting discarded future";
+
+      LOG(ERROR) << "Failed to append the log: " << failure;
 
-    truncations[at] = to;
+      return Error(failure);
+    } else {
+      return future.get();
+    }
   }
+}
+
+
+Result<Log::Position> Log::Writer::truncate(
+    const Log::Position& to,
+    const Timeout& timeout)
+{
+  LOG(INFO) << "Attempting to truncate the log to " << to.value;
 
-  process::initialize();
+  Future<Log::Position> future =
+    dispatch(process, &LogWriterProcess::truncate, to);
 
-  LogProcess log(quorum, file, servers, znode, end, truncations);
-  spawn(log);
-  wait(log);
+  if (!future.await(timeout.remaining())) {
+    LOG(INFO) << "Timed out while trying to truncate the log";
 
-  return 0;
+    future.discard();
+    return None();
+  } else {
+    if (!future.isReady()) {
+      string failure =
+        future.isFailed() ?
+        future.failure() :
+        "Not expecting discarded future";
+
+      LOG(ERROR) << "Failed to truncate the log: " << failure;
+
+      return Error(failure);
+    } else {
+      return future.get();
+    }
+  }
 }
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/log.hpp
----------------------------------------------------------------------
diff --git a/src/log/log.hpp b/src/log/log.hpp
index 042f13b..1f0b30d 100644
--- a/src/log/log.hpp
+++ b/src/log/log.hpp
@@ -28,15 +28,10 @@
 #include <process/shared.hpp>
 #include <process/timeout.hpp>
 
-#include <stout/check.hpp>
-#include <stout/error.hpp>
-#include <stout/foreach.hpp>
+#include <stout/duration.hpp>
 #include <stout/none.hpp>
+#include <stout/option.hpp>
 #include <stout/result.hpp>
-#include <stout/try.hpp>
-
-#include "log/coordinator.hpp"
-#include "log/replica.hpp"
 
 #include "zookeeper/group.hpp"
 
@@ -44,6 +39,12 @@ namespace mesos {
 namespace internal {
 namespace log {
 
+// Forward declarations.
+class LogProcess;
+class LogReaderProcess;
+class LogWriterProcess;
+
+
 class Log
 {
 public:
@@ -98,9 +99,12 @@ public:
 
   private:
     friend class Log;
-    friend class Reader;
     friend class Writer;
+    friend class LogReaderProcess;
+    friend class LogWriterProcess;
+
     Position(uint64_t _value) : value(_value) {}
+
     uint64_t value;
   };
 
@@ -111,8 +115,8 @@ public:
     std::string data;
 
   private:
-    friend class Reader;
-    friend class Writer;
+    friend class LogReaderProcess;
+
     Entry(const Position& _position, const std::string& _data)
       : position(_position), data(_data) {}
   };
@@ -125,9 +129,10 @@ public:
 
     // Returns all entries between the specified positions, unless
     // those positions are invalid, in which case returns an error.
-    Result<std::list<Entry> > read(const Position& from,
-                                   const Position& to,
-                                   const process::Timeout& timeout);
+    Result<std::list<Entry> > read(
+        const Position& from,
+        const Position& to,
+        const process::Timeout& timeout);
 
     // Returns the beginning position of the log from the perspective
     // of the local replica (which may be out of date if the log has
@@ -141,7 +146,7 @@ public:
     Position ending();
 
   private:
-    process::Shared<Replica> replica;
+    LogReaderProcess* process;
   };
 
   class Writer
@@ -171,69 +176,28 @@ public:
         const process::Timeout& timeout);
 
   private:
-    Option<std::string> error;
-    Coordinator coordinator;
+    LogWriterProcess* process;
   };
 
   // Creates a new replicated log that assumes the specified quorum
-  // size, is backed by a file at the specified path, and coordiantes
+  // size, is backed by a file at the specified path, and coordinates
   // with other replicas via the set of process PIDs.
-  Log(int _quorum,
+  Log(int quorum,
       const std::string& path,
-      const std::set<process::UPID>& pids)
-    : group(NULL),
-      executor(NULL),
-      quorum(_quorum),
-      replica(new Replica(path))
-  {
-    GOOGLE_PROTOBUF_VERIFY_VERSION;
-
-    // Add our own replica to the network.
-    Network* _network = new Network(pids);
-    _network->add(replica->pid());
-
-    network.reset(_network);
-  }
+      const std::set<process::UPID>& pids);
 
   // Creates a new replicated log that assumes the specified quorum
-  // size, is backed by a file at the specified path, and coordiantes
+  // size, is backed by a file at the specified path, and coordinates
   // with other replicas associated with the specified ZooKeeper
   // servers, timeout, and znode.
-  Log(int _quorum,
+  Log(int quorum,
       const std::string& path,
       const std::string& servers,
       const Duration& timeout,
       const std::string& znode,
-      const Option<zookeeper::Authentication>& auth = None())
-    : group(new zookeeper::Group(servers, timeout, znode, auth)),
-      executor(new process::Executor()),
-      quorum(_quorum),
-      replica(new Replica(path)),
-      network(new ZooKeeperNetwork(servers, timeout, znode, auth))
-  {
-    GOOGLE_PROTOBUF_VERIFY_VERSION;
+      const Option<zookeeper::Authentication>& auth = None());
 
-    // Need to add our replica to the ZooKeeper group!
-    LOG(INFO) << "Attempting to join replica to ZooKeeper group";
-
-    membership = group->join(replica->pid())
-      .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
-
-    group->watch()
-      .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
-      .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
-  }
-
-  ~Log()
-  {
-    network.own().await();
-    replica.own().await();
-
-    delete executor;
-    delete group;
-  }
+  ~Log();
 
   // Returns a position based off of the bytes recovered from
   // Position.identity().
@@ -252,196 +216,13 @@ public:
       ((uint64_t) (bytes[7] & 0xff));
     return Position(value);
   }
-private:
-  friend class Reader;
-  friend class Writer;
-
-  // TODO(benh): Factor this out into some sort of "membership renewer".
-  void watch(const std::set<zookeeper::Group::Membership>& memberships);
-  void failed(const std::string& message) const;
-  void discarded() const;
-
-  // We store a Group instance in order to continually renew the
-  // replicas membership (when using ZooKeeper).
-  zookeeper::Group* group;
-  process::Future<zookeeper::Group::Membership> membership;
-  process::Executor* executor;
-
-  int quorum;
-  process::Shared<Replica> replica;
-  process::Shared<Network> network;
-};
-
-
-Log::Reader::Reader(Log* log)
-  : replica(log->replica) {}
-
-
-Log::Reader::~Reader() {}
-
-
-Result<std::list<Log::Entry> > Log::Reader::read(
-    const Log::Position& from,
-    const Log::Position& to,
-    const process::Timeout& timeout)
-{
-  process::Future<std::list<Action> > actions =
-    replica->read(from.value, to.value);
-
-  if (!actions.await(timeout.remaining())) {
-    return None();
-  } else if (actions.isFailed()) {
-    return Error(actions.failure());
-  }
-
-  CHECK(actions.isReady()) << "Not expecting discarded future!";
-
-  std::list<Log::Entry> entries;
-
-  uint64_t position = from.value;
-
-  foreach (const Action& action, actions.get()) {
-    // Ensure read range is valid.
-    if (!action.has_performed() ||
-        !action.has_learned() ||
-        !action.learned()) {
-      return Error("Bad read range (includes pending entries)");
-    } else if (position++ != action.position()) {
-      return Error("Bad read range (includes missing entries)");
-    }
-
-    // And only return appends.
-    CHECK(action.has_type());
-    if (action.type() == Action::APPEND) {
-      entries.push_back(Entry(action.position(), action.append().bytes()));
-    }
-  }
-
-  return entries;
-}
-
-
-Log::Position Log::Reader::beginning()
-{
-  // TODO(benh): Take a timeout and return an Option.
-  process::Future<uint64_t> value = replica->beginning();
-  value.await();
-  CHECK(value.isReady()) << "Not expecting a failed or discarded future!";
-  return Log::Position(value.get());
-}
-
-
-Log::Position Log::Reader::ending()
-{
-  // TODO(benh): Take a timeout and return an Option.
-  process::Future<uint64_t> value = replica->ending();
-  value.await();
-  CHECK(value.isReady()) << "Not expecting a failed or discarded future!";
-  return Log::Position(value.get());
-}
-
-
-Log::Writer::Writer(Log* log, const Duration& timeout, int retries)
-  : error(None()),
-    coordinator(log->quorum, log->replica, log->network)
-{
-  do {
-    Result<uint64_t> result = coordinator.elect(process::Timeout::in(timeout));
-    if (result.isNone()) {
-      retries--;
-    } else if (result.isSome()) {
-      break;
-    } else {
-      error = result.error();
-      break;
-    }
-  } while (retries > 0);
-}
-
-
-Log::Writer::~Writer()
-{
-  coordinator.demote();
-}
-
-
-Result<Log::Position> Log::Writer::append(
-    const std::string& data,
-    const process::Timeout& timeout)
-{
-  if (error.isSome()) {
-    return Error(error.get());
-  }
-
-  LOG(INFO) << "Attempting to append " << data.size() << " bytes to the log";
-
-  Result<uint64_t> result = coordinator.append(data, timeout);
-
-  if (result.isError()) {
-    error = result.error();
-    return Error(error.get());
-  } else if (result.isNone()) {
-    return None();
-  }
-
-  CHECK_SOME(result);
-
-  return Log::Position(result.get());
-}
-
-
-Result<Log::Position> Log::Writer::truncate(
-    const Log::Position& to,
-    const process::Timeout& timeout)
-{
-  if (error.isSome()) {
-    return Error(error.get());
-  }
-
-  LOG(INFO) << "Attempting to truncate the log to " << to.value;
-
-  Result<uint64_t> result = coordinator.truncate(to.value, timeout);
-
-  if (result.isError()) {
-    error = result.error();
-    return Error(error.get());
-  } else if (result.isNone()) {
-    return None();
-  }
-
-  CHECK_SOME(result);
-
-  return Log::Position(result.get());
-}
-
-
-void Log::watch(const std::set<zookeeper::Group::Membership>& memberships)
-{
-  if (membership.isReady() && memberships.count(membership.get()) == 0) {
-    // Our replica's membership must have expired, join back up.
-    LOG(INFO) << "Renewing replica group membership";
-    membership = group->join(replica->pid())
-      .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
-      .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
-  }
-
-  group->watch(memberships)
-    .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
-    .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
-    .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
-}
-
-
-void Log::failed(const std::string& message) const
-{
-  LOG(FATAL) << "Failed to participate in ZooKeeper group: " << message;
-}
 
+private:
+  friend class LogReaderProcess;
+  friend class LogWriterProcess;
 
-void Log::discarded() const
-{
-  LOG(FATAL) << "Not expecting future to get discarded!";
-}
+  LogProcess* process;
+};
 
 } // namespace log {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
new file mode 100644
index 0000000..0ab8e95
--- /dev/null
+++ b/src/log/recover.cpp
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <set>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "log/catchup.hpp"
+#include "log/recover.hpp"
+
+#include "messages/log.hpp"
+
+using namespace process;
+
+using std::set;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// This process is used to recover a replica. The flow of the recover
+// process is described as follows:
+// A) Check the status of the local replica.
+//    A1) If it is VOTING, exit.
+//    A2) If it is not VOTING, goto (B).
+// B) Broadcast a RecoverRequest to all replicas in the network.
+//    B1) <<< Catch-up >>> If a quorum of replicas are found in VOTING
+//        status (no matter what the status of the local replica is),
+//        set the status of the local replica to RECOVERING, and start
+//        doing catch-up. If the local replica has been caught-up, set
+//        the status of the local replica to VOTING and exit.
+//    B2) If a quorum is not found, goto (B).
+//
+// In the following, we list a few scenarios and show how the recover
+// process will respond in those scenarios. All the examples assume a
+// quorum size of 2. Remember that a new replica is always put in
+// EMPTY status initially.
+//
+// 1) Replica A, B and C are all in VOTING status. The operator adds
+//    replica D. In that case, D will go into RECOVERING status and
+//    then go into VOTING status. Therefore, we should avoid adding a
+//    new replica unless we know that one replica has been removed.
+//
+// 2) Replica A and B are in VOTING status. The operator adds replica
+//    C. In that case, C will go into RECOVERING status and then go
+//    into VOTING status, which is expected.
+//
+// 3) Replica A is in VOTING status. The operator adds replica B. In
+//    that case, B will stay in EMPTY status forever. This is expected
+//    because we cannot make progress if VOTING replicas are not
+//    enough (i.e., less than quorum).
+//
+// 4) Replica A is in VOTING status and B is in EMPTY status. The
+//    operator adds replica C. In that case, C will stay in EMPTY
+//    status forever similar to case 3).
+class RecoverProcess : public Process<RecoverProcess>
+{
+public:
+  RecoverProcess(
+      size_t _quorum,
+      const Owned<Replica>& _replica,
+      const Shared<Network>& _network)
+    : ProcessBase(ID::generate("log-recover")),
+      quorum(_quorum),
+      replica(_replica),
+      network(_network) {}
+
+  Future<Owned<Replica> > future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
+  {
+    LOG(INFO) << "Start recovering a replica";
+
+    // Stop when no one cares.
+    promise.future().onDiscarded(lambda::bind(
+          static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+    // Check the current status of the local replica and decide if
+    // recovery is needed. Recovery is needed if the local replica is
+    // not in VOTING status.
+    replica->status().onAny(defer(self(), &Self::checked, lambda::_1));
+  }
+
+  virtual void finalize()
+  {
+    LOG(INFO) << "Recover process terminated";
+
+    // Cancel all operations if they are still pending.
+    discard(responses);
+    catching.discard();
+  }
+
+private:
+  void checked(const Future<Metadata::Status>& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to get replica status: " + future.failure() :
+          "Not expecting discarded future");
+
+      terminate(self());
+      return;
+    }
+
+    status = future.get();
+
+    LOG(INFO) << "Replica is in " << status << " status";
+
+    if (status == Metadata::VOTING) {
+      promise.set(replica);
+      terminate(self());
+    } else {
+      recover();
+    }
+  }
+
+  void recover()
+  {
+    CHECK_NE(status, Metadata::VOTING);
+
+    // Broadcast recover request to all replicas.
+    network->broadcast(protocol::recover, RecoverRequest())
+      .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+  }
+
+  void broadcasted(const Future<set<Future<RecoverResponse> > >& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to broadcast the recover request: " + future.failure() :
+          "Not expecting discarded future");
+
+      terminate(self());
+      return;
+    }
+
+    responses = future.get();
+
+    if (responses.empty()) {
+      // Retry if no replica is currently in the network.
+      retry();
+    } else {
+      // Instead of using a for loop here, we use select to process
+      // responses one after another so that we can ignore the rest if
+      // we have collected enough responses.
+      select(responses)
+        .onReady(defer(self(), &Self::received, lambda::_1));
+
+      // Reset the counters.
+      responsesReceived.clear();
+      lowestBeginPosition = None();
+      highestEndPosition = None();
+    }
+  }
+
+  void received(const Future<RecoverResponse>& future)
+  {
+    // Enforced by the select semantics.
+    CHECK(future.isReady());
+
+    // Remove this future from 'responses' so that we do not listen on
+    // it the next time we invoke select.
+    responses.erase(future);
+
+    const RecoverResponse& response = future.get();
+
+    LOG(INFO) << "Received a recover response from a replica in "
+              << response.status() << " status";
+
+    responsesReceived[response.status()]++;
+
+    // We need to remember the lowest begin position and highest end
+    // position seen from VOTING replicas.
+    if (response.status() == Metadata::VOTING) {
+      CHECK(response.has_begin() && response.has_end());
+
+      lowestBeginPosition = min(lowestBeginPosition, response.begin());
+      highestEndPosition = max(highestEndPosition, response.end());
+    }
+
+    // If we got responses from a quorum of VOTING replicas, the local
+    // replica will be put in RECOVERING status and start catching up.
+    // It is likely that the local replica is in RECOVERING status
+    // already. This is the case where the replica crashes during
+    // catch-up. When it restarts, we need to recalculate the lowest
+    // begin position and the highest end position since we haven't
+    // persisted this information on disk.
+    if (responsesReceived[Metadata::VOTING] >= quorum) {
+      discard(responses);
+      update(Metadata::RECOVERING);
+      return;
+    }
+
+    if (responses.empty()) {
+      // All responses have been received but neither have we received
+      // enough responses from VOTING replicas to do catch-up, nor are
+      // we in start-up case. This is either because we don't have
+      // enough replicas in the network (e.g. ZooKeeper blip), or we
+      // don't have enough VOTING replicas to proceed. We will retry
+      // the recovery in both cases.
+      retry();
+    } else {
+      // Wait for the next response.
+      select(responses)
+        .onReady(defer(self(), &Self::received, lambda::_1));
+    }
+  }
+
+  void update(const Metadata::Status& _status)
+  {
+    LOG(INFO) << "Updating replica status from "
+              << status << " to " << _status;
+
+    replica->update(_status)
+      .onAny(defer(self(), &Self::updated, _status, lambda::_1));
+  }
+
+  void updated(const Metadata::Status& _status, const Future<bool>& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to update replica status: " + future.failure() :
+          "Not expecting discarded future");
+
+      terminate(self());
+      return;
+    } else if (!future.get()) {
+      promise.fail("Failed to update replica status");
+      terminate(self());
+      return;
+    }
+
+    // The replica status has been updated successfully. Depending on
+    // the new status, we decide what the next action should be.
+    status = _status;
+
+    if (status == Metadata::VOTING) {
+      LOG(INFO) << "Successfully joined the Paxos group";
+
+      promise.set(replica);
+      terminate(self());
+    } else if (status == Metadata::RECOVERING) {
+      catchup();
+    } else {
+      // The replica should not be in any other status.
+      LOG(FATAL) << "Unexpected replica status";
+    }
+  }
+
+  void catchup()
+  {
+    // We reach here either because the log is empty (uninitialized),
+    // or the log is not empty but a previous unfinished catch-up
+    // attempt has been detected (the process crashes/killed when
+    // catching up). In either case, the local replica may have lost
+    // some data and Paxos states, and should not be allowed to vote.
+    // Otherwise, we may introduce inconsistency in the log as the
+    // local replica could have accepted a write which it would not
+    // have accepted if the data and the Paxos states were not lost.
+    // Now, the question is how many positions the local replica
+    // should catch up before it can be allowed to vote. We find that
+    // it is sufficient to catch-up positions from _begin_ to _end_
+    // where _begin_ is the smallest position seen in a quorum of
+    // VOTING replicas and _end_ is the largest position seen in a
+    // quorum of VOTING replicas. Here is the correctness argument.
+    // For a position _e_ larger than _end_, obviously no value has
+    // been agreed on for that position. Otherwise, we should find at
+    // least one VOTING replica in a quorum of replicas such that its
+    // end position is larger than _end_. For the same reason, a
+    // coordinator should not have collected enough promises for
+    // position _e_. Therefore, it's safe for the local replica to
+    // vote for that position. For a position _b_ smaller than
+    // _begin_, it should have already been truncated and the
+    // truncation should have already been agreed. Therefore, allowing
+    // the local replica to vote for that position is safe.
+    CHECK(lowestBeginPosition.isSome());
+    CHECK(highestEndPosition.isSome());
+    CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get());
+
+    uint64_t begin = lowestBeginPosition.get();
+    uint64_t end = highestEndPosition.get();
+
+    set<uint64_t> positions;
+    for (uint64_t p = begin; p <= end; ++p) {
+      positions.insert(p);
+    }
+
+    // Share the ownership of the replica. From this point until the
+    // point where the ownership of the replica is regained, we should
+    // not access the 'replica' field.
+    Shared<Replica> shared = replica.share();
+
+    // Since we do not know what proposal number to use (the log is
+    // empty), we use proposal number 0 and leave log::catchup to
+    // automatically bump the proposal number.
+    catching = log::catchup(quorum, shared, network, 0, positions);
+    catching.onAny(defer(self(), &Self::caughtup, shared, lambda::_1));
+  }
+
+  void caughtup(Shared<Replica> shared, const Future<Nothing>& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to catch-up: " + future.failure() :
+          "Not expecting discarded future");
+
+      terminate(self());
+    } else {
+      // Try to regain the ownership of the replica.
+      shared.own().onAny(defer(self(), &Self::owned, lambda::_1));
+    }
+  }
+
+  void owned(const Future<Owned<Replica> >& future)
+  {
+    if (!future.isReady()) {
+      promise.fail(
+          future.isFailed() ?
+          "Failed to own the replica: " + future.failure() :
+          "Not expecting discarded future");
+
+      terminate(self());
+    } else {
+      // Allow the replica to vote once the catch-up is done.
+      replica = future.get();
+      update(Metadata::VOTING);
+    }
+  }
+
+  void retry()
+  {
+    // We add a random delay before each retry because we do not want
+    // to saturate the network/disk IO in some cases (e.g., network
+    // size is less than quorum). The delay is chosen randomly to
+    // reduce the likelihood of conflicts (i.e., a replica receives a
+    // recover request while it is changing its status).
+    static const Duration T = Milliseconds(500);
+    Duration d = T * (1.0 + (double) ::random() / RAND_MAX);
+    delay(d, self(), &Self::recover);
+  }
+
+  const size_t quorum;
+  Owned<Replica> replica;
+  const Shared<Network> network;
+
+  Metadata::Status status;
+  set<Future<RecoverResponse> > responses;
+  hashmap<Metadata::Status, size_t> responsesReceived;
+  Option<uint64_t> lowestBeginPosition;
+  Option<uint64_t> highestEndPosition;
+  Future<Nothing> catching;
+
+  process::Promise<Owned<Replica> > promise;
+};
+
+
+Future<Owned<Replica> > recover(
+    size_t quorum,
+    const Owned<Replica>& replica,
+    const Shared<Network>& network)
+{
+  RecoverProcess* process = new RecoverProcess(quorum, replica, network);
+  Future<Owned<Replica> > future = process->future();
+  spawn(process, true);
+  return future;
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/recover.hpp
----------------------------------------------------------------------
diff --git a/src/log/recover.hpp b/src/log/recover.hpp
new file mode 100644
index 0000000..634bc06
--- /dev/null
+++ b/src/log/recover.hpp
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_RECOVER_HPP__
+#define __LOG_RECOVER_HPP__
+
+#include <stdint.h>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/shared.hpp>
+
+#include <stout/nothing.hpp>
+
+#include "log/network.hpp"
+#include "log/replica.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Recovers a replica by catching up enough missing positions. A
+// replica starts with an empty log (e.g., in the case of a disk
+// failure) should not be allowed to vote. Otherwise, the new votes it
+// makes may contradict its lost votes, leading to potential
+// inconsistency in the log. Instead, the replica should be put in
+// non-voting status and catch up missing positions (and associated
+// Paxos states). The replica can be re-allowed to vote if the
+// following two conditions are met: 1) a sufficient amount of missing
+// positions are recovered such that if other replicas fail, the
+// remaining replicas can restore all the successfully written log
+// entries; 2) its future votes cannot not contradict its lost votes.
+// This function returns an owned pointer to the recovered replica if
+// the recovery is successful.
+extern process::Future<process::Owned<Replica> > recover(
+    size_t quorum,
+    const process::Owned<Replica>& replica,
+    const process::Shared<Network>& network);
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_RECOVER_HPP__