You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:25 UTC
[05/10] git commit: Added log recovery support.
Added log recovery support.
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/15802
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f9b60c4c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f9b60c4c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f9b60c4c
Branch: refs/heads/master
Commit: f9b60c4cac503b2d7a1c0cb5403203e619e563e6
Parents: 6ea7c14
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:53:40 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:53:40 2014 -0800
----------------------------------------------------------------------
src/Makefile.am | 3 +
src/common/type_utils.hpp | 25 +
src/log/coordinator.cpp | 121 +---
src/log/coordinator.hpp | 42 +-
src/log/log.cpp | 1192 ++++++++++++++++++++++++----------------
src/log/log.hpp | 283 ++--------
src/log/recover.cpp | 403 ++++++++++++++
src/log/recover.hpp | 59 ++
src/log/replica.cpp | 214 ++++++--
src/log/replica.hpp | 13 +-
src/messages/log.proto | 18 +-
src/tests/log_tests.cpp | 419 ++++++++------
12 files changed, 1727 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 17fbf83..60fcb31 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -301,11 +301,14 @@ liblog_la_SOURCES = \
log/catchup.cpp \
log/consensus.cpp \
log/coordinator.cpp \
+ log/log.cpp \
+ log/recover.cpp \
log/replica.cpp
liblog_la_SOURCES += \
log/catchup.hpp \
log/consensus.hpp \
log/coordinator.hpp \
+ log/recover.hpp \
log/replica.hpp \
log/log.hpp \
log/network.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/common/type_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.hpp b/src/common/type_utils.hpp
index 3b05751..fe6bf71 100644
--- a/src/common/type_utils.hpp
+++ b/src/common/type_utils.hpp
@@ -30,6 +30,7 @@
#include "common/attributes.hpp"
+#include "messages/log.hpp"
#include "messages/messages.hpp"
// This file includes definitions for operators on protobuf classes
@@ -371,4 +372,28 @@ inline std::ostream& operator << (
}} // namespace mesos { namespace internal {
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const Action::Type& type)
+{
+ return stream << Action::Type_Name(type);
+}
+
+
+inline std::ostream& operator << (
+ std::ostream& stream,
+ const Metadata::Status& status)
+{
+ return stream << Metadata::Status_Name(status);
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos
+
#endif // __TYPE_UTILS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/coordinator.cpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp
index 21f2865..bc85e66 100644
--- a/src/log/coordinator.cpp
+++ b/src/log/coordinator.cpp
@@ -20,13 +20,13 @@
#include <process/defer.hpp>
#include <process/dispatch.hpp>
-#include <process/future.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
-#include <stout/error.hpp>
#include <stout/none.hpp>
+#include "common/type_utils.hpp"
+
#include "log/catchup.hpp"
#include "log/consensus.hpp"
#include "log/coordinator.hpp"
@@ -59,22 +59,10 @@ public:
virtual ~CoordinatorProcess() {}
- // Handles coordinator election. Returns the last committed log
- // position if the operation succeeds. Returns none if the election
- // is not successful, but can be retried.
+ // See comments in 'coordinator.hpp'.
Future<Option<uint64_t> > elect();
-
- // Handles coordinator demotion. Returns the last committed log
- // position if the operation succeeds.
Future<uint64_t> demote();
-
- // Appends the specified bytes to the end of the log. Returns the
- // position of the appended entry if the operation succeeds.
Future<uint64_t> append(const string& bytes);
-
- // Removes all log entries preceding the log entry at the given
- // position (to). Returns the position at which the truncate
- // operation is written if the operation succeeds.
Future<uint64_t> truncate(uint64_t to);
protected:
@@ -344,8 +332,7 @@ Future<uint64_t> CoordinatorProcess::truncate(uint64_t to)
Future<uint64_t> CoordinatorProcess::write(const Action& action)
{
- LOG(INFO) << "Coordinator attempting to write "
- << Action::Type_Name(action.type())
+ LOG(INFO) << "Coordinator attempting to write " << action.type()
<< " action at position " << action.position();
CHECK_EQ(state, ELECTED);
@@ -455,111 +442,27 @@ Coordinator::~Coordinator()
}
-Result<uint64_t> Coordinator::elect(const Timeout& timeout)
+Future<Option<uint64_t> > Coordinator::elect()
{
- LOG(INFO) << "Coordinator attempting to get elected within "
- << timeout.remaining();
-
- Future<Option<uint64_t> > electing =
- dispatch(process, &CoordinatorProcess::elect);
-
- electing.await(timeout.remaining());
-
- CHECK(!electing.isDiscarded());
-
- if (electing.isPending()) {
- LOG(INFO) << "Coordinator timed out while trying to get elected";
-
- electing.discard();
- return None();
- } else if (electing.isFailed()) {
- LOG(ERROR) << "Coordinator failed to get elected: "
- << electing.failure();
-
- return Error(electing.failure());
- } else {
- if (electing.get().isNone()) {
- LOG(INFO) << "Coordinator lost an election, but can be retried";
-
- return None();
- } else {
- LOG(INFO) << "Coordinator elected with current position "
- << electing.get().get();
-
- return electing.get().get();
- }
- }
+ return dispatch(process, &CoordinatorProcess::elect);
}
-Result<uint64_t> Coordinator::demote()
+Future<uint64_t> Coordinator::demote()
{
- Future<uint64_t> demoting =
- dispatch(process, &CoordinatorProcess::demote);
-
- demoting.await(); // TODO(jieyu): Use a timeout.
-
- CHECK(!demoting.isDiscarded());
-
- if (demoting.isFailed()) {
- return Error(demoting.failure());
- } else {
- return demoting.get();
- }
+ return dispatch(process, &CoordinatorProcess::demote);
}
-Result<uint64_t> Coordinator::append(
- const string& bytes,
- const Timeout& timeout)
+Future<uint64_t> Coordinator::append(const string& bytes)
{
- Future<uint64_t> appending =
- dispatch(process, &CoordinatorProcess::append, bytes);
-
- appending.await(timeout.remaining());
-
- CHECK(!appending.isDiscarded());
-
- if (appending.isPending()) {
- LOG(INFO) << "Coordinator timed out while trying to append";
-
- appending.discard();
- return None();
- } else if (appending.isFailed()) {
- LOG(ERROR) << "Coordinator failed to append the log: "
- << appending.failure();
-
- return Error(appending.failure());
- } else {
- return appending.get();
- }
+ return dispatch(process, &CoordinatorProcess::append, bytes);
}
-Result<uint64_t> Coordinator::truncate(
- uint64_t to,
- const Timeout& timeout)
+Future<uint64_t> Coordinator::truncate(uint64_t to)
{
- Future<uint64_t> truncating =
- dispatch(process, &CoordinatorProcess::truncate, to);
-
- truncating.await(timeout.remaining());
-
- CHECK(!truncating.isDiscarded());
-
- if (truncating.isPending()) {
- LOG(INFO) << "Coordinator timed out while trying to truncate";
-
- truncating.discard();
- return None();
- } else if (truncating.isFailed()) {
- LOG(ERROR) << "Coordinator failed to truncate the log: "
- << truncating.failure();
-
- return Error(truncating.failure());
- } else {
- return truncating.get();
- }
+ return dispatch(process, &CoordinatorProcess::truncate, to);
}
} // namespace log {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/coordinator.hpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.hpp b/src/log/coordinator.hpp
index 43cb530..35b68e9 100644
--- a/src/log/coordinator.hpp
+++ b/src/log/coordinator.hpp
@@ -23,10 +23,10 @@
#include <string>
+#include <process/future.hpp>
#include <process/shared.hpp>
-#include <process/timeout.hpp>
-#include <stout/result.hpp>
+#include <stout/option.hpp>
#include "log/network.hpp"
#include "log/replica.hpp"
@@ -49,25 +49,25 @@ public:
~Coordinator();
- // Handles coordinator election/demotion. A result of none means the
- // coordinator failed to achieve a quorum (e.g., due to timeout) but
- // can be retried. A some result returns the last committed log
- // position.
- Result<uint64_t> elect(const process::Timeout& timeout);
- Result<uint64_t> demote();
-
- // Returns the result of trying to append the specified bytes. A
- // result of none means the append failed (e.g., due to timeout),
- // but can be retried.
- Result<uint64_t> append(
- const std::string& bytes,
- const process::Timeout& timeout);
-
- // Returns the result of trying to truncate the log (from the
- // beginning to the specified position exclusive). A result of
- // none means the truncate failed (e.g., due to timeout), but can be
- // retried.
- Result<uint64_t> truncate(uint64_t to, const process::Timeout& timeout);
+ // Handles coordinator election. Returns the last committed (a.k.a.,
+ // learned) log position if the operation succeeds. Returns none if
+ // the election is not successful, but can be retried.
+ process::Future<Option<uint64_t> > elect();
+
+ // Handles coordinator demotion. Returns the last committed (a.k.a.,
+ // learned) log position if the operation succeeds. One should only
+ // call this function if the coordinator has been elected, and no
+ // write (append or truncate) is in progress.
+ process::Future<uint64_t> demote();
+
+ // Appends the specified bytes to the end of the log. Returns the
+ // position of the appended entry if the operation succeeds.
+ process::Future<uint64_t> append(const std::string& bytes);
+
+ // Removes all log entries preceding the log entry at the given
+ // position (to). Returns the position at which the truncate
+ // operation is written if the operation succeeds.
+ process::Future<uint64_t> truncate(uint64_t to);
private:
CoordinatorProcess* process;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/log.cpp
----------------------------------------------------------------------
diff --git a/src/log/log.cpp b/src/log/log.cpp
index d057925..e83f822 100644
--- a/src/log/log.cpp
+++ b/src/log/log.cpp
@@ -16,639 +16,893 @@
* limitations under the License.
*/
-// TODO(benh): Optimize LearnedMessage (and the "commit" stage in
-// general) by figuring out a way to not send the entire action
-// contents a second time (should cut bandwidth used in half).
-
-// TODO(benh): Provide a LearnRequest that requests more than one
-// position at a time, and a LearnResponse that returns as many
-// positions as it knows.
-
-// TODO(benh): Implement background catchup: have a new replica that
-// comes online become part of the group but don't respond to promises
-// or writes until it has caught up! The advantage to becoming part of
-// the group is that the new replica can see where the end of the log
-// is in order to continue to catch up.
-
-// TODO(benh): Add tests that deliberatly put the system in a state of
-// inconsistency by doing funky things to the underlying logs. Figure
-// out ways of bringing new replicas online that seem to check the
-// consistency of the other replicas.
-
-#include <list>
-#include <map>
-#include <set>
-#include <string>
-#include <vector>
-
-#include <boost/lexical_cast.hpp>
-
+#include <process/defer.hpp>
#include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
#include <process/process.hpp>
-#include <process/run.hpp>
+#include <process/shared.hpp>
#include <stout/check.hpp>
-#include <stout/duration.hpp>
-#include <stout/fatal.hpp>
+#include <stout/error.hpp>
#include <stout/foreach.hpp>
-#include <stout/os.hpp>
-#include <stout/result.hpp>
-
-#include "zookeeper/zookeeper.hpp"
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/set.hpp>
#include "log/coordinator.hpp"
+#include "log/log.hpp"
+#include "log/network.hpp"
+#include "log/recover.hpp"
#include "log/replica.hpp"
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::log;
-
using namespace process;
using std::list;
-using std::map;
-using std::pair;
using std::set;
using std::string;
-using std::vector;
+namespace mesos {
+namespace internal {
+namespace log {
-// class Drop : public Filter
-// {
-// public:
-// Drop
-// virtual bool filter(Message* message)
-// {
-// return == message->name;
-// }
-// };
+class LogProcess : public Process<LogProcess>
+{
+public:
+ LogProcess(
+ size_t _quorum,
+ const string& path,
+ const set<UPID>& pids);
+
+ LogProcess(
+ size_t _quorum,
+ const string& path,
+ const string& servers,
+ const Duration& timeout,
+ const string& znode,
+ const Option<zookeeper::Authentication>& auth);
+
+ // Recovers the log by catching up if needed. Returns a shared
+ // pointer to the local replica if the recovery succeeds.
+ Future<Shared<Replica> > recover();
+protected:
+ virtual void initialize();
+ virtual void finalize();
-// class PeriodicFilter
+private:
+ friend class LogReaderProcess;
+ friend class LogWriterProcess;
+ // Continuations.
+ void _recover();
-char** args; // Command line arguments for doing a restart.
+ // TODO(benh): Factor this out into "membership renewer".
+ void watch(
+ const UPID& pid,
+ const set<zookeeper::Group::Membership>& memberships);
+ void failed(const string& message);
+ void discarded();
-void restart()
-{
- LOG(INFO) << "Restarting ...";
- execv(args[0], args);
- fatalerror("Failed to exec");
-}
+ const size_t quorum;
+ Shared<Replica> replica;
+ Shared<Network> network;
+ // For replica recovery.
+ Option<Future<Owned<Replica> > > recovering;
+ process::Promise<Nothing> recovered;
+ list<process::Promise<Shared<Replica> >*> promises;
-bool coordinate(Coordinator* coordinator,
- uint64_t id,
- int end,
- map<int, int> truncations)
+ // For renewing membership. We store a Group instance in order to
+ // continually renew the replicas membership (when using ZooKeeper).
+ zookeeper::Group* group;
+ Future<zookeeper::Group::Membership> membership;
+};
+
+
+class LogReaderProcess : public Process<LogReaderProcess>
{
- const int attempts = 3;
+public:
+ LogReaderProcess(Log* log);
- uint64_t index;
+ Future<Log::Position> beginning();
+ Future<Log::Position> ending();
- int attempt = 1;
- while (true) {
- Result<uint64_t> result = coordinator->elect(id);
- if (result.isError()) {
- restart();
- } else if (result.isNone()) {
- if (attempt == attempts) {
- restart();
- } else {
- attempt++;
- os::sleep(Seconds(1));
- }
- } else {
- CHECK_SOME(result);
- index = result.get();
- break;
- }
- }
+ Future<list<Log::Entry> > read(
+ const Log::Position& from,
+ const Log::Position& to);
- uint64_t value = 0;
-
- if (index != 0) {
- attempt = 1;
- while (true) {
- Result<list<pair<uint64_t, string> > > result =
- coordinator->read(index, index);
- if (result.isError()) {
- LOG(INFO) << "Restarting due to read error";
- restart();
- } else if (result.isNone()) {
- if (attempt == attempts) {
- LOG(INFO) << "Restarting after too many attempts";
- restart();
- } else {
- attempt++;
- os::sleep(Seconds(1));
- }
- } else {
- CHECK_SOME(result);
- const list<pair<uint64_t, string> >& list = result.get();
- if (list.size() != 1) {
- index--;
- } else {
- try {
- value = boost::lexical_cast<uint64_t>(list.front().second);
- } catch (boost::bad_lexical_cast&) {
- LOG(INFO) << "Restarting due to conversion error";
- restart();
- }
- break;
- }
- }
- }
- }
+protected:
+ virtual void initialize();
+ virtual void finalize();
- value++;
-
- srand(time(NULL));
-
- int writes = rand() % 500;
-
- LOG(INFO) << "Attempting to do " << writes << " writes";
-
- attempt = 1;
- while (writes > 0 && value <= end) {
- if (truncations.count(value) > 0) {
- int to = truncations[value];
- Result<uint64_t> result = coordinator->truncate(to);
- if (result.isError()) {
- LOG(INFO) << "Restarting due to truncate error";
- restart();
- } else if (result.isNone()) {
- if (attempt == attempts) {
- LOG(INFO) << "Restarting after too many attempts";
- restart();
- } else {
- attempt++;
- os::sleep(Seconds(1));
- continue;
- }
- } else {
- CHECK_SOME(result);
- LOG(INFO) << "Truncated to " << to;
- os::sleep(Seconds(1));
- attempt = 1;
- }
- }
+private:
+ // Returns a position from a raw value.
+ static Log::Position position(uint64_t value);
- Result<uint64_t> result = coordinator->append(stringify(value));
- if (result.isError()) {
- LOG(INFO) << "Restarting due to append error";
- restart();
- } else if (result.isNone()) {
- if (attempt == attempts) {
- LOG(INFO) << "Restarting after too many attempts";
- restart();
- } else {
- attempt++;
- os::sleep(Seconds(1));
- }
- } else {
- CHECK_SOME(result);
- LOG(INFO) << "Wrote " << value;
- os::sleep(Seconds(1));
- writes--;
- value++;
- attempt = 1;
- }
- }
+ // Returns a future which gets set when the log recovery has
+ // finished (either succeeded or failed).
+ Future<Nothing> recover();
- exit(0);
- return true;
-}
+ // Continuations.
+ void _recover();
+ Future<Log::Position> _beginning();
+ Future<Log::Position> _ending();
-class LogProcess : public Process<LogProcess>
+ Future<list<Log::Entry> > _read(
+ const Log::Position& from,
+ const Log::Position& to);
+
+ Future<list<Log::Entry> > __read(
+ const Log::Position& from,
+ const Log::Position& to,
+ const list<Action>& actions);
+
+ Future<Shared<Replica> > recovering;
+ list<process::Promise<Nothing>*> promises;
+};
+
+
+class LogWriterProcess : public Process<LogWriterProcess>
{
public:
- LogProcess(int _quorum,
- const string& _file,
- const string& _servers,
- const string& _znode,
- int _end,
- const map<int, int>& _truncations);
-
- virtual ~LogProcess();
-
- // ZooKeeper events. TODO(*): Use a ZooKeeper listener?
- void connected();
- void reconnecting();
- void reconnected();
- void expired();
- void updated(const string& path);
+ LogWriterProcess(Log* log);
+
+ Future<Option<Log::Position> > elect();
+ Future<Log::Position> append(const string& bytes);
+ Future<Log::Position> truncate(const Log::Position& to);
protected:
- virtual void initialze();
+ virtual void initialize();
+ virtual void finalize();
private:
- // Updates the group.
- void regroup();
+ // Returns a position from a raw value.
+ static Log::Position position(uint64_t value);
- // Runs an election.
- void elect();
+ // Returns a future which gets set when the log recovery has
+ // finished (either succeeded or failed).
+ Future<Nothing> recover();
- // ZooKeeper bits and pieces.
- string servers;
- string znode;
- ZooKeeper* zk;
- Watcher* watcher;
+ // Continuations.
+ void _recover();
- // Size of quorum.
- int quorum;
+ Future<Option<Log::Position> > _elect();
+ Option<Log::Position> __elect(const Option<uint64_t>& result);
- // Log file.
- string file;
+ void failed(const string& message);
- // Termination value (when to stop writing to the log).
- int end;
+ const size_t quorum;
+ const Shared<Network> network;
- // Truncation points.
- map<int, int> truncations;
+ Future<Shared<Replica> > recovering;
+ list<process::Promise<Nothing>*> promises;
- // Coordinator id.
- uint64_t id;
+ Coordinator* coordinator;
+ Option<string> error;
+};
- // Whether or not the coordinator has been elected.
- bool elected;
- // Group members.
- set<UPID> members;
+/////////////////////////////////////////////////
+// Implementation of LogProcess.
+/////////////////////////////////////////////////
- ReplicaProcess* replica;
- GroupProcess* group;
- Coordinator* coordinator;
-};
+
+LogProcess::LogProcess(
+ size_t _quorum,
+ const string& path,
+ const set<UPID>& pids)
+ : ProcessBase(ID::generate("log")),
+ quorum(_quorum),
+ replica(new Replica(path)),
+ network(new Network(pids + (UPID) replica->pid())),
+ group(NULL) {}
+
+
+LogProcess::LogProcess(
+ size_t _quorum,
+ const string& path,
+ const string& servers,
+ const Duration& timeout,
+ const string& znode,
+ const Option<zookeeper::Authentication>& auth)
+ : ProcessBase(ID::generate("log")),
+ quorum(_quorum),
+ replica(new Replica(path)),
+ network(new ZooKeeperNetwork(servers, timeout, znode, auth)),
+ group(new zookeeper::Group(servers, timeout, znode, auth)) {}
-class LogProcessWatcher : public Watcher
+void LogProcess::initialize()
{
-public:
- LogProcessWatcher(const PID<LogProcess>& _pid)
- : pid(_pid), reconnect(false) {}
-
- virtual ~LogProcessWatcher() {}
-
- virtual void process(ZooKeeper* zk, int type, int state, const string& path)
- {
- if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
- // Check if this is a reconnect.
- if (!reconnect) {
- // Initial connect.
- dispatch(pid, &LogProcess::connected);
- } else {
- // Reconnected.
- dispatch(pid, &LogProcess::reconnected);
- }
- } else if ((state == ZOO_CONNECTING_STATE) &&
- (type == ZOO_SESSION_EVENT)) {
- // The client library automatically reconnects, taking into
- // account failed servers in the connection string,
- // appropriately handling the "herd effect", etc.
- reconnect = true;
- dispatch(pid, &LogProcess::reconnecting);
- } else if ((state == ZOO_EXPIRED_SESSION_STATE) &&
- (type == ZOO_SESSION_EVENT)) {
- dispatch(pid, &LogProcess::expired);
-
- // If this watcher is reused, the next connect won't be a reconnect.
- reconnect = false;
- } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHILD_EVENT)) {
- dispatch(pid, &LogProcess::updated, path);
- } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
- dispatch(pid, &LogProcess::updated, path);
- } else {
- LOG(FATAL) << "Unimplemented ZooKeeper event: (state is "
- << state << " and type is " << type << ")";
- }
+ if (group != NULL) {
+ // Need to add our replica to the ZooKeeper group!
+ LOG(INFO) << "Attempting to join replica to ZooKeeper group";
+
+ membership = group->join(replica->pid())
+ .onFailed(defer(self(), &Self::failed, lambda::_1))
+ .onDiscarded(defer(self(), &Self::discarded));
+
+ // We save and pass the pid of the replica to the 'watch' function
+ // because the field member 'replica' is not available during
+ // recovery. We need the pid to renew the replicas membership.
+ group->watch()
+ .onReady(defer(self(), &Self::watch, replica->pid(), lambda::_1))
+ .onFailed(defer(self(), &Self::failed, lambda::_1))
+ .onDiscarded(defer(self(), &Self::discarded));
}
-private:
- const PID<LogProcess> pid;
- bool reconnect;
-};
+ // Start the recovery.
+ recover();
+}
+
+
+void LogProcess::finalize()
+{
+ if (recovering.isSome()) {
+ // Stop the recovery if it is still pending.
+ recovering.get().discard();
+ }
+ // If there exist operations that are gated by the recovery, we fail
+ // all of them because the log is being deleted.
+ foreach (process::Promise<Shared<Replica> >* promise, promises) {
+ promise->fail("Log is being deleted");
+ delete promise;
+ }
+ promises.clear();
-LogProcess::LogProcess(int _quorum,
- const string& _file,
- const string& _servers,
- const string& _znode,
- int _end,
- const map<int, int>& _truncations)
- : quorum(_quorum),
- file(_file),
- servers(_servers),
- znode(_znode),
- end(_end),
- truncations(_truncations),
- id(0),
- elected(false),
- replica(NULL),
- group(NULL),
- coordinator(NULL) {}
-
-
-LogProcess::~LogProcess()
-{
- delete zk;
- delete watcher;
- delete replica;
delete group;
- delete coordinator;
+
+ // Wait for the shared pointers 'network' and 'replica' to become
+ // unique (i.e., no other reference to them). These calls should not
+ // be blocking for too long because at this moment, all operations
+ // should have been cancelled or are being cancelled. We do this
+ // because we want to make sure that after the log is deleted, all
+ // operations associated with this log are terminated.
+ network.own().await();
+ replica.own().await();
}
-void LogProcess::connected()
+Future<Shared<Replica> > LogProcess::recover()
{
- LOG(INFO) << "Log connected to ZooKeeper";
+ // The future 'recovered' is used to mark the success (or the
+ // failure) of the recovery. We do not use the future 'recovering'
+ // to do that because it can be set in other process and thus has a
+ // race condition which we want to avoid. We deliberately do not
+ // save replica in 'recovered' because it will complicate our
+ // deleting logic (see 'finalize').
+ Future<Nothing> future = recovered.future();
+
+ if (future.isDiscarded()) {
+ return Failure("Not expecting discarded future");
+ } else if (future.isFailed()) {
+ return Failure(future.failure());
+ } else if (future.isReady()) {
+ return replica;
+ }
+
+ // Recovery has not finished yet. Create a promise and queue it such
+ // that it can get notified once the recovery has finished (either
+ // succeeded or failed).
+ process::Promise<Shared<Replica> >* promise =
+ new process::Promise<Shared<Replica> >();
+
+ promises.push_back(promise);
+
+ if (recovering.isNone()) {
+ // TODO(jieyu): At this moment, we haven't shared 'replica' to
+ // others yet. Therefore, the following 'replica.own()' call
+ // should not be blocking. In the future, we may wanna support
+ // 'release' in Shared which will provide this CHECK internally.
+ CHECK(replica.unique());
+
+ recovering = log::recover(quorum, replica.own().get(), network)
+ .onAny(defer(self(), &Self::_recover));
+ }
+
+ return promise->future();
+}
+
- int ret;
- string result;
+void LogProcess::_recover()
+{
+ CHECK_SOME(recovering);
- // Assume the znode that was created does not end with a "/".
- CHECK(znode.size() == 0 || znode.at(znode.size() - 1) != '/');
+ Future<Owned<Replica> > future = recovering.get();
- // Create directory path znodes as necessary.
- size_t index = znode.find("/", 0);
+ if (!future.isReady()) {
+ // The 'future' here can only be discarded in 'finalize'.
+ string failure = future.isFailed() ?
+ future.failure() :
+ "The future 'recovering' is unexpectedly discarded";
- while (index < string::npos) {
- // Get out the prefix to create.
- index = znode.find("/", index + 1);
- string prefix = znode.substr(0, index);
+ // Mark the failure of the recovery.
+ recovered.fail(failure);
- LOG(INFO) << "Log trying to create znode '"
- << prefix << "' in ZooKeeper";
+ foreach (process::Promise<Shared<Replica> >* promise, promises) {
+ promise->fail(failure);
+ delete promise;
+ }
+ promises.clear();
+ } else {
+ replica = future.get().share();
- // Create the node (even if it already exists).
- ret = zk->create(
- prefix,
- "",
- ZOO_OPEN_ACL_UNSAFE,
- // ZOO_CREATOR_ALL_ACL, // needs authentication
- 0,
- &result);
+ // Mark the success of the recovery.
+ recovered.set(Nothing());
- if (ret != ZOK && ret != ZNODEEXISTS) {
- LOG(FATAL) << "Failed to create '" << prefix
- << "' in ZooKeeper: " << zk->message(ret);
+ foreach (process::Promise<Shared<Replica> >* promise, promises) {
+ promise->set(replica);
+ delete promise;
}
+ promises.clear();
}
+}
- // Now create the "replicas" znode.
- LOG(INFO) << "Log trying to create znode '" << znode
- << "/replicas" << "' in ZooKeeper";
- // Create the node (even if it already exists).
- ret = zk->create(znode + "/replicas", "", ZOO_OPEN_ACL_UNSAFE,
- // ZOO_CREATOR_ALL_ACL, // needs authentication
- 0, &result);
+void LogProcess::watch(
+ const UPID& pid,
+ const set<zookeeper::Group::Membership>& memberships)
+{
+ if (membership.isReady() && memberships.count(membership.get()) == 0) {
+ // Our replica's membership must have expired, join back up.
+ LOG(INFO) << "Renewing replica group membership";
- if (ret != ZOK && ret != ZNODEEXISTS) {
- LOG(FATAL) << "Failed to create '" << znode << "/replicas"
- << "' in ZooKeeper: " << zk->message(ret);
+ membership = group->join(pid)
+ .onFailed(defer(self(), &Self::failed, lambda::_1))
+ .onDiscarded(defer(self(), &Self::discarded));
}
- // Now create the "coordinators" znode.
- LOG(INFO) << "Log trying to create znode '" << znode
- << "/coordinators" << "' in ZooKeeper";
+ group->watch(memberships)
+ .onReady(defer(self(), &Self::watch, pid, lambda::_1))
+ .onFailed(defer(self(), &Self::failed, lambda::_1))
+ .onDiscarded(defer(self(), &Self::discarded));
+}
- // Create the node (even if it already exists).
- ret = zk->create(znode + "/coordinators", "", ZOO_OPEN_ACL_UNSAFE,
- // ZOO_CREATOR_ALL_ACL, // needs authentication
- 0, &result);
- if (ret != ZOK && ret != ZNODEEXISTS) {
- LOG(FATAL) << "Failed to create '" << znode << "/coordinators"
- << "' in ZooKeeper: " << zk->message(ret);
- }
+void LogProcess::failed(const string& message)
+{
+ LOG(FATAL) << "Failed to participate in ZooKeeper group: " << message;
+}
- // Okay, create our replica, group, and coordinator.
- replica = new ReplicaProcess(file);
- spawn(replica);
- group = new GroupProcess();
- spawn(group);
+void LogProcess::discarded()
+{
+ LOG(FATAL) << "Not expecting future to get discarded!";
+}
- coordinator = new Coordinator(quorum, replica, group);
- // Set a watch on the replicas.
- ret = zk->getChildren(znode + "/replicas", true, NULL);
+/////////////////////////////////////////////////
+// Implementation of LogReaderProcess.
+/////////////////////////////////////////////////
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
- << "' in ZooKeeper: " << zk->message(ret);
- }
- // Set a watch on the coordinators.
- ret = zk->getChildren(znode + "/coordinators", true, NULL);
+LogReaderProcess::LogReaderProcess(Log* log)
+ : ProcessBase(ID::generate("log-reader")),
+ recovering(dispatch(log->process, &LogProcess::recover)) {}
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
- << "' in ZooKeeper: " << zk->message(ret);
- }
- // Add an ephemeral znode for our replica and coordinator.
- ret = zk->create(znode + "/replicas/", replica->self(), ZOO_OPEN_ACL_UNSAFE,
- // ZOO_CREATOR_ALL_ACL, // needs authentication
- ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
+void LogReaderProcess::initialize()
+{
+ recovering.onAny(defer(self(), &Self::_recover));
+}
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to create an ephmeral node at '" << znode
- << "/replica/" << "' in ZooKeeper: " << zk->message(ret);
+
+void LogReaderProcess::finalize()
+{
+ foreach (process::Promise<Nothing>* promise, promises) {
+ promise->fail("Log reader is being deleted");
+ delete promise;
}
+ promises.clear();
+}
- ret = zk->create(znode + "/coordinators/", "", ZOO_OPEN_ACL_UNSAFE,
- // ZOO_CREATOR_ALL_ACL, // needs authentication
- ZOO_SEQUENCE | ZOO_EPHEMERAL, &result);
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to create an ephmeral node at '" << znode
- << "/replica/" << "' in ZooKeeper: " << zk->message(ret);
+Future<Nothing> LogReaderProcess::recover()
+{
+ if (recovering.isReady()) {
+ return Nothing();
+ } else if (recovering.isFailed()) {
+ return Failure(recovering.failure());
+ } else if (recovering.isDiscarded()) {
+ return Failure("The future 'recovering' is unexpectedly discarded");
}
- // Save the sequence id but only grab the basename, e.g.,
- // "/path/to/znode/000000131" => "000000131".
- result = utils::os::basename(result);
+ // At this moment, the future 'recovering' should most likely be
+ // pending. But it is also likely that it gets set after the above
+ // checks. Either way, we know that the continuation '_recover' has
+ // not been called yet (otherwise, we should not be able to reach
+ // here). The promise we are creating below will be properly
+ // set/failed when '_recover' is called.
+ process::Promise<Nothing>* promise = new process::Promise<Nothing>();
+ promises.push_back(promise);
+ return promise->future();
+}
+
- try {
- id = boost::lexical_cast<uint64_t>(result);
- } catch (boost::bad_lexical_cast&) {
- LOG(FATAL) << "Failed to convert '" << result << "' into an integer";
+void LogReaderProcess::_recover()
+{
+ if (!recovering.isReady()) {
+ foreach (process::Promise<Nothing>* promise, promises) {
+ promise->fail(
+ recovering.isFailed() ?
+ recovering.failure() :
+ "The future 'recovering' is unexpectedly discarded");
+ delete promise;
+ }
+ promises.clear();
+ } else {
+ foreach (process::Promise<Nothing>* promise, promises) {
+ promise->set(Nothing());
+ delete promise;
+ }
+ promises.clear();
}
+}
+
- // Run an election!
- elect();
+Future<Log::Position> LogReaderProcess::beginning()
+{
+ return recover().then(defer(self(), &Self::_beginning));
}
-void LogProcess::reconnecting()
+Future<Log::Position> LogReaderProcess::_beginning()
{
- LOG(INFO) << "Reconnecting to ZooKeeper";
+ CHECK(recovering.isReady());
+
+ return recovering.get()->beginning()
+ .then(lambda::bind(&Self::position, lambda::_1));
}
-void LogProcess::reconnected()
+Future<Log::Position> LogReaderProcess::ending()
{
- LOG(INFO) << "Reconnected to ZooKeeper";
+ return recover().then(defer(self(), &Self::_ending));
}
-void LogProcess::expired()
+Future<Log::Position> LogReaderProcess::_ending()
{
- restart();
+ CHECK(recovering.isReady());
+
+ return recovering.get()->ending()
+ .then(lambda::bind(&Self::position, lambda::_1));
}
-void LogProcess::updated(const string& path)
+Future<list<Log::Entry> > LogReaderProcess::read(
+ const Log::Position& from,
+ const Log::Position& to)
{
- if (znode + "/replicas" == path) {
+ return recover().then(defer(self(), &Self::_read, from, to));
+}
- regroup();
- // Reset a watch on the replicas.
- int ret = zk->getChildren(znode + "/replicas", true, NULL);
+Future<list<Log::Entry> > LogReaderProcess::_read(
+ const Log::Position& from,
+ const Log::Position& to)
+{
+ CHECK(recovering.isReady());
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
- << "' in ZooKeeper: " << zk->message(ret);
- }
- } else {
- CHECK(znode + "/coordinators" == path);
+ return recovering.get()->read(from.value, to.value)
+ .then(defer(self(), &Self::__read, from, to, lambda::_1));
+}
- elect();
- // Reset a watch on the coordinators.
- int ret = zk->getChildren(znode + "/coordinators", true, NULL);
+Future<list<Log::Entry> > LogReaderProcess::__read(
+ const Log::Position& from,
+ const Log::Position& to,
+ const list<Action>& actions)
+{
+ list<Log::Entry> entries;
+
+ uint64_t position = from.value;
+
+ foreach (const Action& action, actions) {
+ // Ensure read range is valid.
+ if (!action.has_performed() ||
+ !action.has_learned() ||
+ !action.learned()) {
+ return Failure("Bad read range (includes pending entries)");
+ } else if (position++ != action.position()) {
+ return Failure("Bad read range (includes missing entries)");
+ }
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to set a watch on '" << znode << "/replicas"
- << "' in ZooKeeper: " << zk->message(ret);
+ // And only return appends.
+ CHECK(action.has_type());
+ if (action.type() == Action::APPEND) {
+ entries.push_back(Log::Entry(action.position(), action.append().bytes()));
}
}
+
+ return entries;
}
-void LogProcess::initalize()
+Log::Position LogReaderProcess::position(uint64_t value)
{
- // TODO(benh): Real testing requires injecting a ZooKeeper instance.
- watcher = new LogProcessWatcher(self());
- zk = new ZooKeeper(servers, 10000, watcher);
+ return Log::Position(value);
}
-void LogProcess::regroup()
+/////////////////////////////////////////////////
+// Implementation of LogWriterProcess.
+/////////////////////////////////////////////////
+
+
+LogWriterProcess::LogWriterProcess(Log* log)
+ : ProcessBase(ID::generate("log-writer")),
+ quorum(log->process->quorum),
+ network(log->process->network),
+ recovering(dispatch(log->process, &LogProcess::recover)),
+ coordinator(NULL),
+ error(None()) {}
+
+
+void LogWriterProcess::initialize()
{
- vector<string> results;
+ recovering.onAny(defer(self(), &Self::_recover));
+}
- int ret = zk->getChildren(znode + "/replicas", false, &results);
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to get children of '" << znode << "/replicas"
- << "' in ZooKeeper: " << zk->message(ret);
+void LogWriterProcess::finalize()
+{
+ foreach (process::Promise<Nothing>* promise, promises) {
+ promise->fail("Log writer is being deleted");
+ delete promise;
}
+ promises.clear();
+
+ delete coordinator;
+}
- set<UPID> current;
- set<UPID> added;
- set<UPID> removed;
- foreach (const string& result, results) {
- string s;
- int ret = zk->get(znode + "/replicas/" + result, false, &s, NULL);
- UPID pid = s;
- current.insert(pid);
+Future<Nothing> LogWriterProcess::recover()
+{
+ if (recovering.isReady()) {
+ return Nothing();
+ } else if (recovering.isFailed()) {
+ return Failure(recovering.failure());
+ } else if (recovering.isDiscarded()) {
+ return Failure("The future 'recovering' is unexpectedly discarded");
}
- foreach (const UPID& pid, current) {
- if (members.count(pid) == 0) {
- added.insert(pid);
+ // At this moment, the future 'recovering' should most likely be
+ // pending. But it is also likely that it gets set after the above
+ // checks. Either way, we know that the continuation '_recover' has
+ // not been called yet (otherwise, we should not be able to reach
+ // here). The promise we are creating below will be properly
+ // set/failed when '_recover' is called.
+ process::Promise<Nothing>* promise = new process::Promise<Nothing>();
+ promises.push_back(promise);
+ return promise->future();
+}
+
+
+void LogWriterProcess::_recover()
+{
+ if (!recovering.isReady()) {
+ foreach (process::Promise<Nothing>* promise, promises) {
+ promise->fail(
+ recovering.isFailed() ?
+ recovering.failure() :
+ "The future 'recovering' is unexpectedly discarded");
+ delete promise;
+ }
+ promises.clear();
+ } else {
+ foreach (process::Promise<Nothing>* promise, promises) {
+ promise->set(Nothing());
+ delete promise;
}
+ promises.clear();
}
+}
- foreach (const UPID& pid, members) {
- if (current.count(pid) == 0) {
- removed.insert(pid);
- }
+
+Future<Option<Log::Position> > LogWriterProcess::elect()
+{
+ return recover().then(defer(self(), &Self::_elect));
+}
+
+
+Future<Option<Log::Position> > LogWriterProcess::_elect()
+{
+ // We delete the existing coordinator (if exists) and create a new
+ // coordinator each time 'elect' is called.
+ delete coordinator;
+ error = None();
+
+ CHECK(recovering.isReady());
+
+ coordinator = new Coordinator(quorum, recovering.get(), network);
+
+ return coordinator->elect()
+ .then(defer(self(), &Self::__elect, lambda::_1))
+ .onFailed(defer(self(), &Self::failed, lambda::_1));
+}
+
+
+Option<Log::Position> LogWriterProcess::__elect(const Option<uint64_t>& result)
+{
+ if (result.isNone()) {
+ return None();
+ } else {
+ return position(result.get());
+ }
+}
+
+
+Future<Log::Position> LogWriterProcess::append(const string& bytes)
+{
+ if (coordinator == NULL) {
+ return Failure("No election has been performed");
}
- foreach (const UPID& pid, added) {
- dispatch(group, &GroupProcess::add, pid);
- members.insert(pid);
+ if (error.isSome()) {
+ return Failure(error.get());
}
- foreach (const UPID& pid, removed) {
- dispatch(group, &GroupProcess::remove, pid);
- members.erase(pid);
+ return coordinator->append(bytes)
+ .then(lambda::bind(&Self::position, lambda::_1))
+ .onFailed(defer(self(), &Self::failed, lambda::_1));
+}
+
+
+Future<Log::Position> LogWriterProcess::truncate(const Log::Position& to)
+{
+ if (coordinator == NULL) {
+ return Failure("No election has been performed");
}
+
+ if (error.isSome()) {
+ return Failure(error.get());
+ }
+
+ return coordinator->truncate(to.value)
+ .then(lambda::bind(&Self::position, lambda::_1))
+ .onFailed(defer(self(), &Self::failed, lambda::_1));
}
-void LogProcess::elect()
+void LogWriterProcess::failed(const string& message)
{
- vector<string> results;
+ error = message;
+}
- int ret = zk->getChildren(znode + "/coordinators", false, &results);
- if (ret != ZOK) {
- LOG(FATAL) << "Failed to get children of '" << znode << "/coordinators"
- << "' in ZooKeeper: " << zk->message(ret);
- }
+Log::Position LogWriterProcess::position(uint64_t value)
+{
+ return Log::Position(value);
+}
+
+
+/////////////////////////////////////////////////
+// Public interfaces for Log.
+/////////////////////////////////////////////////
+
+
+Log::Log(
+ int quorum,
+ const string& path,
+ const set<UPID>& pids)
+{
+ GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+ process = new LogProcess(quorum, path, pids);
+ spawn(process);
+}
+
+Log::Log(
+ int quorum,
+ const string& path,
+ const string& servers,
+ const Duration& timeout,
+ const string& znode,
+ const Option<zookeeper::Authentication>& auth)
+{
+ GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+ process = new LogProcess(quorum, path, servers, timeout, znode, auth);
+ spawn(process);
+}
+
+
+Log::~Log()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
- // "Elect" the minimum ephemeral znode.
- uint64_t min = LONG_MAX;
- foreach (const string& result, results) {
- try {
- min = std::min(min, boost::lexical_cast<uint64_t>(result));
- } catch (boost::bad_lexical_cast&) {
- LOG(FATAL) << "Failed to convert '" << result << "' into an integer";
+/////////////////////////////////////////////////
+// Public interfaces for Log::Reader.
+/////////////////////////////////////////////////
+
+
+Log::Reader::Reader(Log* log)
+{
+ process = new LogReaderProcess(log);
+ spawn(process);
+}
+
+
+Log::Reader::~Reader()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+Result<list<Log::Entry> > Log::Reader::read(
+ const Log::Position& from,
+ const Log::Position& to,
+ const Timeout& timeout)
+{
+ Future<list<Log::Entry> > future =
+ dispatch(process, &LogReaderProcess::read, from, to);
+
+ if (!future.await(timeout.remaining())) {
+ LOG(INFO) << "Timed out while trying to read the log";
+
+ future.discard();
+ return None();
+ } else {
+ if (!future.isReady()) {
+ string failure =
+ future.isFailed() ?
+ future.failure() :
+ "Not expecting discarded future";
+
+ LOG(ERROR) << "Failed to read the log: " << failure;
+
+ return Error(failure);
+ } else {
+ return future.get();
}
}
+}
- if (id == min && !elected) {
- elected = true;
- process::run(&coordinate, coordinator, id, end, truncations);
- } else if (elected) {
- LOG(INFO) << "Restarting due to demoted";
- restart();
- }
+
+Log::Position Log::Reader::beginning()
+{
+ // TODO(benh): Take a timeout and return an Option.
+ return dispatch(process, &LogReaderProcess::beginning).get();
}
-int main(int argc, char** argv)
+Log::Position Log::Reader::ending()
{
- if (argc < 6) {
- fatal("Usage: %s <quorum> <file> <servers> <znode> <end> <at> <to> ...",
- argv[0]);
- }
+ // TODO(benh): Take a timeout and return an Option.
+ return dispatch(process, &LogReaderProcess::ending).get();
+}
- args = argv;
- int quorum = atoi(argv[1]);
- string file = argv[2];
- string servers = argv[3];
- string znode = argv[4];
- int end = atoi(argv[5]);
+/////////////////////////////////////////////////
+// Public interfaces for Log::Writer.
+/////////////////////////////////////////////////
- map<int, int> truncations;
- for (int i = 6; argv[i] != NULL; i += 2) {
- if (argv[i + 1] == NULL) {
- fatal("Expecting 'to' argument for truncation");
+Log::Writer::Writer(Log* log, const Duration& timeout, int retries)
+{
+ process = new LogWriterProcess(log);
+ spawn(process);
+
+ // Trying to get elected.
+ for (;;) {
+ LOG(INFO) << "Attempting to get elected within " << timeout;
+
+ Future<Option<Log::Position> > future =
+ dispatch(process, &LogWriterProcess::elect);
+
+ if (!future.await(timeout)) {
+ LOG(INFO) << "Timed out while trying to get elected";
+
+ // Cancel the election. It is likely that the election is done
+ // right after the timeout has been reached. In that case, we
+ // may unnecessarily rerun the election, but it is safe.
+ future.discard();
+ } else {
+ if (!future.isReady()) {
+ string failure =
+ future.isFailed() ?
+ future.failure() :
+ "Not expecting discarded future";
+
+ LOG(ERROR) << "Failed to get elected: " << failure;
+ break;
+ } else if (future.get().isNone()) {
+ LOG(INFO) << "Lost an election, but can be retried";
+ } else {
+ LOG(INFO) << "Elected with current position "
+ << future.get().get().value;
+ return;
+ }
+ }
+
+ if (--retries < 0) {
+ LOG(ERROR) << "Retry limit has been reached during election";
+ break;
}
+ }
+}
+
+
+Log::Writer::~Writer()
+{
+ terminate(process);
+ process::wait(process);
+ delete process;
+}
+
+
+Result<Log::Position> Log::Writer::append(
+ const string& data,
+ const Timeout& timeout)
+{
+ LOG(INFO) << "Attempting to append " << data.size() << " bytes to the log";
+
+ Future<Log::Position> future =
+ dispatch(process, &LogWriterProcess::append, data);
+
+ if (!future.await(timeout.remaining())) {
+ LOG(INFO) << "Timed out while trying to append the log";
- int at = atoi(argv[i]);
- int to = atoi(argv[i + 1]);
+ future.discard();
+ return None();
+ } else {
+ if (!future.isReady()) {
+ string failure =
+ future.isFailed() ?
+ future.failure() :
+ "Not expecting discarded future";
+
+ LOG(ERROR) << "Failed to append the log: " << failure;
- truncations[at] = to;
+ return Error(failure);
+ } else {
+ return future.get();
+ }
}
+}
+
+
+Result<Log::Position> Log::Writer::truncate(
+ const Log::Position& to,
+ const Timeout& timeout)
+{
+ LOG(INFO) << "Attempting to truncate the log to " << to.value;
- process::initialize();
+ Future<Log::Position> future =
+ dispatch(process, &LogWriterProcess::truncate, to);
- LogProcess log(quorum, file, servers, znode, end, truncations);
- spawn(log);
- wait(log);
+ if (!future.await(timeout.remaining())) {
+ LOG(INFO) << "Timed out while trying to truncate the log";
- return 0;
+ future.discard();
+ return None();
+ } else {
+ if (!future.isReady()) {
+ string failure =
+ future.isFailed() ?
+ future.failure() :
+ "Not expecting discarded future";
+
+ LOG(ERROR) << "Failed to truncate the log: " << failure;
+
+ return Error(failure);
+ } else {
+ return future.get();
+ }
+ }
}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/log.hpp
----------------------------------------------------------------------
diff --git a/src/log/log.hpp b/src/log/log.hpp
index 042f13b..1f0b30d 100644
--- a/src/log/log.hpp
+++ b/src/log/log.hpp
@@ -28,15 +28,10 @@
#include <process/shared.hpp>
#include <process/timeout.hpp>
-#include <stout/check.hpp>
-#include <stout/error.hpp>
-#include <stout/foreach.hpp>
+#include <stout/duration.hpp>
#include <stout/none.hpp>
+#include <stout/option.hpp>
#include <stout/result.hpp>
-#include <stout/try.hpp>
-
-#include "log/coordinator.hpp"
-#include "log/replica.hpp"
#include "zookeeper/group.hpp"
@@ -44,6 +39,12 @@ namespace mesos {
namespace internal {
namespace log {
+// Forward declarations.
+class LogProcess;
+class LogReaderProcess;
+class LogWriterProcess;
+
+
class Log
{
public:
@@ -98,9 +99,12 @@ public:
private:
friend class Log;
- friend class Reader;
friend class Writer;
+ friend class LogReaderProcess;
+ friend class LogWriterProcess;
+
Position(uint64_t _value) : value(_value) {}
+
uint64_t value;
};
@@ -111,8 +115,8 @@ public:
std::string data;
private:
- friend class Reader;
- friend class Writer;
+ friend class LogReaderProcess;
+
Entry(const Position& _position, const std::string& _data)
: position(_position), data(_data) {}
};
@@ -125,9 +129,10 @@ public:
// Returns all entries between the specified positions, unless
// those positions are invalid, in which case returns an error.
- Result<std::list<Entry> > read(const Position& from,
- const Position& to,
- const process::Timeout& timeout);
+ Result<std::list<Entry> > read(
+ const Position& from,
+ const Position& to,
+ const process::Timeout& timeout);
// Returns the beginning position of the log from the perspective
// of the local replica (which may be out of date if the log has
@@ -141,7 +146,7 @@ public:
Position ending();
private:
- process::Shared<Replica> replica;
+ LogReaderProcess* process;
};
class Writer
@@ -171,69 +176,28 @@ public:
const process::Timeout& timeout);
private:
- Option<std::string> error;
- Coordinator coordinator;
+ LogWriterProcess* process;
};
// Creates a new replicated log that assumes the specified quorum
- // size, is backed by a file at the specified path, and coordiantes
+ // size, is backed by a file at the specified path, and coordinates
// with other replicas via the set of process PIDs.
- Log(int _quorum,
+ Log(int quorum,
const std::string& path,
- const std::set<process::UPID>& pids)
- : group(NULL),
- executor(NULL),
- quorum(_quorum),
- replica(new Replica(path))
- {
- GOOGLE_PROTOBUF_VERIFY_VERSION;
-
- // Add our own replica to the network.
- Network* _network = new Network(pids);
- _network->add(replica->pid());
-
- network.reset(_network);
- }
+ const std::set<process::UPID>& pids);
// Creates a new replicated log that assumes the specified quorum
- // size, is backed by a file at the specified path, and coordiantes
+ // size, is backed by a file at the specified path, and coordinates
// with other replicas associated with the specified ZooKeeper
// servers, timeout, and znode.
- Log(int _quorum,
+ Log(int quorum,
const std::string& path,
const std::string& servers,
const Duration& timeout,
const std::string& znode,
- const Option<zookeeper::Authentication>& auth = None())
- : group(new zookeeper::Group(servers, timeout, znode, auth)),
- executor(new process::Executor()),
- quorum(_quorum),
- replica(new Replica(path)),
- network(new ZooKeeperNetwork(servers, timeout, znode, auth))
- {
- GOOGLE_PROTOBUF_VERIFY_VERSION;
+ const Option<zookeeper::Authentication>& auth = None());
- // Need to add our replica to the ZooKeeper group!
- LOG(INFO) << "Attempting to join replica to ZooKeeper group";
-
- membership = group->join(replica->pid())
- .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
-
- group->watch()
- .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
- .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
- }
-
- ~Log()
- {
- network.own().await();
- replica.own().await();
-
- delete executor;
- delete group;
- }
+ ~Log();
// Returns a position based off of the bytes recovered from
// Position.identity().
@@ -252,196 +216,13 @@ public:
((uint64_t) (bytes[7] & 0xff));
return Position(value);
}
-private:
- friend class Reader;
- friend class Writer;
-
- // TODO(benh): Factor this out into some sort of "membership renewer".
- void watch(const std::set<zookeeper::Group::Membership>& memberships);
- void failed(const std::string& message) const;
- void discarded() const;
-
- // We store a Group instance in order to continually renew the
- // replicas membership (when using ZooKeeper).
- zookeeper::Group* group;
- process::Future<zookeeper::Group::Membership> membership;
- process::Executor* executor;
-
- int quorum;
- process::Shared<Replica> replica;
- process::Shared<Network> network;
-};
-
-
-Log::Reader::Reader(Log* log)
- : replica(log->replica) {}
-
-
-Log::Reader::~Reader() {}
-
-
-Result<std::list<Log::Entry> > Log::Reader::read(
- const Log::Position& from,
- const Log::Position& to,
- const process::Timeout& timeout)
-{
- process::Future<std::list<Action> > actions =
- replica->read(from.value, to.value);
-
- if (!actions.await(timeout.remaining())) {
- return None();
- } else if (actions.isFailed()) {
- return Error(actions.failure());
- }
-
- CHECK(actions.isReady()) << "Not expecting discarded future!";
-
- std::list<Log::Entry> entries;
-
- uint64_t position = from.value;
-
- foreach (const Action& action, actions.get()) {
- // Ensure read range is valid.
- if (!action.has_performed() ||
- !action.has_learned() ||
- !action.learned()) {
- return Error("Bad read range (includes pending entries)");
- } else if (position++ != action.position()) {
- return Error("Bad read range (includes missing entries)");
- }
-
- // And only return appends.
- CHECK(action.has_type());
- if (action.type() == Action::APPEND) {
- entries.push_back(Entry(action.position(), action.append().bytes()));
- }
- }
-
- return entries;
-}
-
-
-Log::Position Log::Reader::beginning()
-{
- // TODO(benh): Take a timeout and return an Option.
- process::Future<uint64_t> value = replica->beginning();
- value.await();
- CHECK(value.isReady()) << "Not expecting a failed or discarded future!";
- return Log::Position(value.get());
-}
-
-
-Log::Position Log::Reader::ending()
-{
- // TODO(benh): Take a timeout and return an Option.
- process::Future<uint64_t> value = replica->ending();
- value.await();
- CHECK(value.isReady()) << "Not expecting a failed or discarded future!";
- return Log::Position(value.get());
-}
-
-
-Log::Writer::Writer(Log* log, const Duration& timeout, int retries)
- : error(None()),
- coordinator(log->quorum, log->replica, log->network)
-{
- do {
- Result<uint64_t> result = coordinator.elect(process::Timeout::in(timeout));
- if (result.isNone()) {
- retries--;
- } else if (result.isSome()) {
- break;
- } else {
- error = result.error();
- break;
- }
- } while (retries > 0);
-}
-
-
-Log::Writer::~Writer()
-{
- coordinator.demote();
-}
-
-
-Result<Log::Position> Log::Writer::append(
- const std::string& data,
- const process::Timeout& timeout)
-{
- if (error.isSome()) {
- return Error(error.get());
- }
-
- LOG(INFO) << "Attempting to append " << data.size() << " bytes to the log";
-
- Result<uint64_t> result = coordinator.append(data, timeout);
-
- if (result.isError()) {
- error = result.error();
- return Error(error.get());
- } else if (result.isNone()) {
- return None();
- }
-
- CHECK_SOME(result);
-
- return Log::Position(result.get());
-}
-
-
-Result<Log::Position> Log::Writer::truncate(
- const Log::Position& to,
- const process::Timeout& timeout)
-{
- if (error.isSome()) {
- return Error(error.get());
- }
-
- LOG(INFO) << "Attempting to truncate the log to " << to.value;
-
- Result<uint64_t> result = coordinator.truncate(to.value, timeout);
-
- if (result.isError()) {
- error = result.error();
- return Error(error.get());
- } else if (result.isNone()) {
- return None();
- }
-
- CHECK_SOME(result);
-
- return Log::Position(result.get());
-}
-
-
-void Log::watch(const std::set<zookeeper::Group::Membership>& memberships)
-{
- if (membership.isReady() && memberships.count(membership.get()) == 0) {
- // Our replica's membership must have expired, join back up.
- LOG(INFO) << "Renewing replica group membership";
- membership = group->join(replica->pid())
- .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
- }
-
- group->watch(memberships)
- .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
- .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
-}
-
-
-void Log::failed(const std::string& message) const
-{
- LOG(FATAL) << "Failed to participate in ZooKeeper group: " << message;
-}
+private:
+ friend class LogReaderProcess;
+ friend class LogWriterProcess;
-void Log::discarded() const
-{
- LOG(FATAL) << "Not expecting future to get discarded!";
-}
+ LogProcess* process;
+};
} // namespace log {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
new file mode 100644
index 0000000..0ab8e95
--- /dev/null
+++ b/src/log/recover.cpp
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <set>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "log/catchup.hpp"
+#include "log/recover.hpp"
+
+#include "messages/log.hpp"
+
+using namespace process;
+
+using std::set;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// This process is used to recover a replica. The flow of the recover
+// process is described as follows:
+// A) Check the status of the local replica.
+// A1) If it is VOTING, exit.
+// A2) If it is not VOTING, goto (B).
+// B) Broadcast a RecoverRequest to all replicas in the network.
+// B1) <<< Catch-up >>> If a quorum of replicas are found in VOTING
+// status (no matter what the status of the local replica is),
+// set the status of the local replica to RECOVERING, and start
+// doing catch-up. If the local replica has been caught-up, set
+// the status of the local replica to VOTING and exit.
+// B2) If a quorum is not found, goto (B).
+//
+// In the following, we list a few scenarios and show how the recover
+// process will respond in those scenarios. All the examples assume a
+// quorum size of 2. Remember that a new replica is always put in
+// EMPTY status initially.
+//
+// 1) Replica A, B and C are all in VOTING status. The operator adds
+// replica D. In that case, D will go into RECOVERING status and
+// then go into VOTING status. Therefore, we should avoid adding a
+// new replica unless we know that one replica has been removed.
+//
+// 2) Replica A and B are in VOTING status. The operator adds replica
+// C. In that case, C will go into RECOVERING status and then go
+// into VOTING status, which is expected.
+//
+// 3) Replica A is in VOTING status. The operator adds replica B. In
+// that case, B will stay in EMPTY status forever. This is expected
+// because we cannot make progress if VOTING replicas are not
+// enough (i.e., less than quorum).
+//
+// 4) Replica A is in VOTING status and B is in EMPTY status. The
+// operator adds replica C. In that case, C will stay in EMPTY
+// status forever similar to case 3).
+class RecoverProcess : public Process<RecoverProcess>
+{
+public:
+ RecoverProcess(
+ size_t _quorum,
+ const Owned<Replica>& _replica,
+ const Shared<Network>& _network)
+ : ProcessBase(ID::generate("log-recover")),
+ quorum(_quorum),
+ replica(_replica),
+ network(_network) {}
+
+ Future<Owned<Replica> > future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ LOG(INFO) << "Start recovering a replica";
+
+ // Stop when no one cares.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+ // Check the current status of the local replica and decide if
+ // recovery is needed. Recovery is needed if the local replica is
+ // not in VOTING status.
+ replica->status().onAny(defer(self(), &Self::checked, lambda::_1));
+ }
+
+ virtual void finalize()
+ {
+ LOG(INFO) << "Recover process terminated";
+
+ // Cancel all operations if they are still pending.
+ discard(responses);
+ catching.discard();
+ }
+
+private:
+ void checked(const Future<Metadata::Status>& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to get replica status: " + future.failure() :
+ "Not expecting discarded future");
+
+ terminate(self());
+ return;
+ }
+
+ status = future.get();
+
+ LOG(INFO) << "Replica is in " << status << " status";
+
+ if (status == Metadata::VOTING) {
+ promise.set(replica);
+ terminate(self());
+ } else {
+ recover();
+ }
+ }
+
+ void recover()
+ {
+ CHECK_NE(status, Metadata::VOTING);
+
+ // Broadcast recover request to all replicas.
+ network->broadcast(protocol::recover, RecoverRequest())
+ .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+ }
+
+ void broadcasted(const Future<set<Future<RecoverResponse> > >& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to broadcast the recover request: " + future.failure() :
+ "Not expecting discarded future");
+
+ terminate(self());
+ return;
+ }
+
+ responses = future.get();
+
+ if (responses.empty()) {
+ // Retry if no replica is currently in the network.
+ retry();
+ } else {
+ // Instead of using a for loop here, we use select to process
+ // responses one after another so that we can ignore the rest if
+ // we have collected enough responses.
+ select(responses)
+ .onReady(defer(self(), &Self::received, lambda::_1));
+
+ // Reset the counters.
+ responsesReceived.clear();
+ lowestBeginPosition = None();
+ highestEndPosition = None();
+ }
+ }
+
+ void received(const Future<RecoverResponse>& future)
+ {
+ // Enforced by the select semantics.
+ CHECK(future.isReady());
+
+ // Remove this future from 'responses' so that we do not listen on
+ // it the next time we invoke select.
+ responses.erase(future);
+
+ const RecoverResponse& response = future.get();
+
+ LOG(INFO) << "Received a recover response from a replica in "
+ << response.status() << " status";
+
+ responsesReceived[response.status()]++;
+
+ // We need to remember the lowest begin position and highest end
+ // position seen from VOTING replicas.
+ if (response.status() == Metadata::VOTING) {
+ CHECK(response.has_begin() && response.has_end());
+
+ lowestBeginPosition = min(lowestBeginPosition, response.begin());
+ highestEndPosition = max(highestEndPosition, response.end());
+ }
+
+ // If we got responses from a quorum of VOTING replicas, the local
+ // replica will be put in RECOVERING status and start catching up.
+ // It is likely that the local replica is in RECOVERING status
+ // already. This is the case where the replica crashes during
+ // catch-up. When it restarts, we need to recalculate the lowest
+ // begin position and the highest end position since we haven't
+ // persisted this information on disk.
+ if (responsesReceived[Metadata::VOTING] >= quorum) {
+ discard(responses);
+ update(Metadata::RECOVERING);
+ return;
+ }
+
+ if (responses.empty()) {
+ // All responses have been received but neither have we received
+ // enough responses from VOTING replicas to do catch-up, nor are
+ // we in start-up case. This is either because we don't have
+ // enough replicas in the network (e.g. ZooKeeper blip), or we
+ // don't have enough VOTING replicas to proceed. We will retry
+ // the recovery in both cases.
+ retry();
+ } else {
+ // Wait for the next response.
+ select(responses)
+ .onReady(defer(self(), &Self::received, lambda::_1));
+ }
+ }
+
+ void update(const Metadata::Status& _status)
+ {
+ LOG(INFO) << "Updating replica status from "
+ << status << " to " << _status;
+
+ replica->update(_status)
+ .onAny(defer(self(), &Self::updated, _status, lambda::_1));
+ }
+
+ void updated(const Metadata::Status& _status, const Future<bool>& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to update replica status: " + future.failure() :
+ "Not expecting discarded future");
+
+ terminate(self());
+ return;
+ } else if (!future.get()) {
+ promise.fail("Failed to update replica status");
+ terminate(self());
+ return;
+ }
+
+ // The replica status has been updated successfully. Depending on
+ // the new status, we decide what the next action should be.
+ status = _status;
+
+ if (status == Metadata::VOTING) {
+ LOG(INFO) << "Successfully joined the Paxos group";
+
+ promise.set(replica);
+ terminate(self());
+ } else if (status == Metadata::RECOVERING) {
+ catchup();
+ } else {
+ // The replica should not be in any other status.
+ LOG(FATAL) << "Unexpected replica status";
+ }
+ }
+
+ void catchup()
+ {
+ // We reach here either because the log is empty (uninitialized),
+ // or the log is not empty but a previous unfinished catch-up
+ // attempt has been detected (the process crashes/killed when
+ // catching up). In either case, the local replica may have lost
+ // some data and Paxos states, and should not be allowed to vote.
+ // Otherwise, we may introduce inconsistency in the log as the
+ // local replica could have accepted a write which it would not
+ // have accepted if the data and the Paxos states were not lost.
+ // Now, the question is how many positions the local replica
+ // should catch up before it can be allowed to vote. We find that
+ // it is sufficient to catch-up positions from _begin_ to _end_
+ // where _begin_ is the smallest position seen in a quorum of
+ // VOTING replicas and _end_ is the largest position seen in a
+ // quorum of VOTING replicas. Here is the correctness argument.
+ // For a position _e_ larger than _end_, obviously no value has
+ // been agreed on for that position. Otherwise, we should find at
+ // least one VOTING replica in a quorum of replicas such that its
+ // end position is larger than _end_. For the same reason, a
+ // coordinator should not have collected enough promises for
+ // position _e_. Therefore, it's safe for the local replica to
+ // vote for that position. For a position _b_ smaller than
+ // _begin_, it should have already been truncated and the
+ // truncation should have already been agreed. Therefore, allowing
+ // the local replica to vote for that position is safe.
+ CHECK(lowestBeginPosition.isSome());
+ CHECK(highestEndPosition.isSome());
+ CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get());
+
+ uint64_t begin = lowestBeginPosition.get();
+ uint64_t end = highestEndPosition.get();
+
+ set<uint64_t> positions;
+ for (uint64_t p = begin; p <= end; ++p) {
+ positions.insert(p);
+ }
+
+ // Share the ownership of the replica. From this point until the
+ // point where the ownership of the replica is regained, we should
+ // not access the 'replica' field.
+ Shared<Replica> shared = replica.share();
+
+ // Since we do not know what proposal number to use (the log is
+ // empty), we use proposal number 0 and leave log::catchup to
+ // automatically bump the proposal number.
+ catching = log::catchup(quorum, shared, network, 0, positions);
+ catching.onAny(defer(self(), &Self::caughtup, shared, lambda::_1));
+ }
+
+ void caughtup(Shared<Replica> shared, const Future<Nothing>& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to catch-up: " + future.failure() :
+ "Not expecting discarded future");
+
+ terminate(self());
+ } else {
+ // Try to regain the ownership of the replica.
+ shared.own().onAny(defer(self(), &Self::owned, lambda::_1));
+ }
+ }
+
+ void owned(const Future<Owned<Replica> >& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to own the replica: " + future.failure() :
+ "Not expecting discarded future");
+
+ terminate(self());
+ } else {
+ // Allow the replica to vote once the catch-up is done.
+ replica = future.get();
+ update(Metadata::VOTING);
+ }
+ }
+
+ void retry()
+ {
+ // We add a random delay before each retry because we do not want
+ // to saturate the network/disk IO in some cases (e.g., network
+ // size is less than quorum). The delay is chosen randomly to
+ // reduce the likelihood of conflicts (i.e., a replica receives a
+ // recover request while it is changing its status).
+ static const Duration T = Milliseconds(500);
+ Duration d = T * (1.0 + (double) ::random() / RAND_MAX);
+ delay(d, self(), &Self::recover);
+ }
+
+ const size_t quorum;
+ Owned<Replica> replica;
+ const Shared<Network> network;
+
+ Metadata::Status status;
+ set<Future<RecoverResponse> > responses;
+ hashmap<Metadata::Status, size_t> responsesReceived;
+ Option<uint64_t> lowestBeginPosition;
+ Option<uint64_t> highestEndPosition;
+ Future<Nothing> catching;
+
+ process::Promise<Owned<Replica> > promise;
+};
+
+
+Future<Owned<Replica> > recover(
+ size_t quorum,
+ const Owned<Replica>& replica,
+ const Shared<Network>& network)
+{
+ RecoverProcess* process = new RecoverProcess(quorum, replica, network);
+ Future<Owned<Replica> > future = process->future();
+ spawn(process, true);
+ return future;
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f9b60c4c/src/log/recover.hpp
----------------------------------------------------------------------
diff --git a/src/log/recover.hpp b/src/log/recover.hpp
new file mode 100644
index 0000000..634bc06
--- /dev/null
+++ b/src/log/recover.hpp
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_RECOVER_HPP__
+#define __LOG_RECOVER_HPP__
+
+#include <stdint.h>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+#include <process/shared.hpp>
+
+#include <stout/nothing.hpp>
+
+#include "log/network.hpp"
+#include "log/replica.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Recovers a replica by catching up enough missing positions. A
+// replica starts with an empty log (e.g., in the case of a disk
+// failure) should not be allowed to vote. Otherwise, the new votes it
+// makes may contradict its lost votes, leading to potential
+// inconsistency in the log. Instead, the replica should be put in
+// non-voting status and catch up missing positions (and associated
+// Paxos states). The replica can be re-allowed to vote if the
+// following two conditions are met: 1) a sufficient amount of missing
+// positions are recovered such that if other replicas fail, the
+// remaining replicas can restore all the successfully written log
+// entries; 2) its future votes cannot not contradict its lost votes.
+// This function returns an owned pointer to the recovered replica if
+// the recovery is successful.
+extern process::Future<process::Owned<Replica> > recover(
+ size_t quorum,
+ const process::Owned<Replica>& replica,
+ const process::Shared<Network>& network);
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_RECOVER_HPP__