You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/17 01:58:22 UTC
[02/10] git commit: Decoupled replicated log coordinator logic and
made it asynchronous.
Decoupled replicated log coordinator logic and made it asynchronous.
This is the first patch of a series of patches that implement a
catch-up mechanism for replicated log. See the following ticket for
more details: https://issues.apache.org/jira/browse/MESOS-736.
Here is a brief summary of this patch: (Sorry for the fact that we are
not able to break it into smaller patches :().
1) Pulled the original Coordinator logic out and divides it into
several Paxos phases (see src/log/consensus.hpp). Instead of using a
blocking semantics, we implemented all the logics asynchronously.
2) In order to ensure the liveness of a catch-uper, we implemented a
retry logic by bumping the proposal number. This also requires us to
slightly change the existing replica protocol.
3) Made the "fill" operation independent of the underlying
replica. Instead, introduced a catchup (see src/log/catchup.hpp)
function to make sure the underlying local replica has learned each
write.
4) Modified the log tests to adapt to the new semantics (see (3)
above).
This is a joint work with Yan Xu.
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/14631
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/19ad88b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/19ad88b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/19ad88b7
Branch: refs/heads/master
Commit: 19ad88b7c45164c1272001493bdd176d80a88b91
Parents: 2ff5308
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Jan 16 16:51:45 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Jan 16 16:51:45 2014 -0800
----------------------------------------------------------------------
src/Makefile.am | 17 +-
src/log/catchup.cpp | 286 +++++++++++++++++
src/log/catchup.hpp | 54 ++++
src/log/consensus.cpp | 711 +++++++++++++++++++++++++++++++++++++++++++
src/log/consensus.hpp | 136 +++++++++
src/log/coordinator.cpp | 472 ++++++----------------------
src/log/coordinator.hpp | 58 +---
src/log/log.hpp | 69 ++---
src/log/network.hpp | 57 ++--
src/log/replica.cpp | 351 +++++++++++----------
src/log/replica.hpp | 25 +-
src/messages/log.proto | 91 +++---
src/tests/log_tests.cpp | 420 +++++++++++++------------
13 files changed, 1866 insertions(+), 881 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index cf0c8c6..17fbf83 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -297,9 +297,20 @@ libmesos_no_3rdparty_la_LIBADD += libbuild.la
# Convenience library for building the replicated log in order to
# include the leveldb headers.
noinst_LTLIBRARIES += liblog.la
-liblog_la_SOURCES = log/coordinator.cpp log/replica.cpp
-liblog_la_SOURCES += log/coordinator.hpp log/replica.hpp log/log.hpp \
- log/network.hpp messages/log.hpp messages/log.proto
+liblog_la_SOURCES = \
+ log/catchup.cpp \
+ log/consensus.cpp \
+ log/coordinator.cpp \
+ log/replica.cpp
+liblog_la_SOURCES += \
+ log/catchup.hpp \
+ log/consensus.hpp \
+ log/coordinator.hpp \
+ log/replica.hpp \
+ log/log.hpp \
+ log/network.hpp \
+ messages/log.hpp \
+ messages/log.proto
nodist_liblog_la_SOURCES = $(LOG_PROTOS)
liblog_la_CPPFLAGS = -I../$(LEVELDB)/include $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
new file mode 100644
index 0000000..5825eae
--- /dev/null
+++ b/src/log/catchup.cpp
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <list>
+
+#include <process/collect.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/stringify.hpp>
+
+#include "log/catchup.hpp"
+#include "log/consensus.hpp"
+
+#include "messages/log.hpp"
+
+using namespace process;
+
+using std::list;
+using std::set;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+class CatchUpProcess : public Process<CatchUpProcess>
+{
+public:
+ CatchUpProcess(
+ size_t _quorum,
+ const Shared<Replica>& _replica,
+ const Shared<Network>& _network,
+ uint64_t _proposal,
+ uint64_t _position)
+ : ProcessBase(ID::generate("log-catch-up")),
+ quorum(_quorum),
+ replica(_replica),
+ network(_network),
+ position(_position),
+ proposal(_proposal) {}
+
+ virtual ~CatchUpProcess() {}
+
+ Future<uint64_t> future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ // Stop when no one cares.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+ check();
+ }
+
+ virtual void finalize()
+ {
+ checking.discard();
+ filling.discard();
+ }
+
+private:
+ void check()
+ {
+ checking = replica->missing(position);
+ checking.onAny(defer(self(), &Self::checked));
+ }
+
+ void checked()
+ {
+ // The future 'checking' can only be discarded in 'finalize'.
+ CHECK(!checking.isDiscarded());
+
+ if (checking.isFailed()) {
+ promise.fail("Failed to get missing positions: " + checking.failure());
+ terminate(self());
+ } else if (!checking.get()) {
+ // The position has been learned.
+ promise.set(proposal);
+ terminate(self());
+ } else {
+ // Still missing, try to fill it.
+ fill();
+ }
+ }
+
+ void fill()
+ {
+ filling = log::fill(quorum, network, proposal, position);
+ filling.onAny(defer(self(), &Self::filled));
+ }
+
+ void filled()
+ {
+ // The future 'filling' can only be discarded in 'finalize'.
+ CHECK(!filling.isDiscarded());
+
+ if (filling.isFailed()) {
+ promise.fail("Failed to fill missing position: " + filling.failure());
+ terminate(self());
+ } else {
+ // Update the proposal number so that we can save a proposal
+ // number bump round trip if we need to invoke fill again.
+ CHECK(filling.get().promised() >= proposal);
+ proposal = filling.get().promised();
+
+ check();
+ }
+ }
+
+ const size_t quorum;
+ const Shared<Replica> replica;
+ const Shared<Network> network;
+ const uint64_t position;
+
+ uint64_t proposal;
+
+ process::Promise<uint64_t> promise;
+ Future<bool> checking;
+ Future<Action> filling;
+};
+
+
+// Catches-up a single log position in the local replica. This
+// function returns the highest proposal number seen. The returned
+// proposal number can be used to save extra proposal number bumps.
+static Future<uint64_t> catchup(
+ size_t quorum,
+ const Shared<Replica>& replica,
+ const Shared<Network>& network,
+ uint64_t proposal,
+ uint64_t position)
+{
+ CatchUpProcess* process =
+ new CatchUpProcess(
+ quorum,
+ replica,
+ network,
+ proposal,
+ position);
+
+ Future<uint64_t> future = process->future();
+ spawn(process, true);
+ return future;
+}
+
+
+// TODO(jieyu): Our current implementation catches-up each position in
+// the set sequentially. In the future, we may want to parallelize it
+// to improve the performance. Also, we may want to implement rate
+// control here so that we don't saturate the network or disk.
+class BulkCatchUpProcess : public Process<BulkCatchUpProcess>
+{
+public:
+ BulkCatchUpProcess(
+ size_t _quorum,
+ const Shared<Replica>& _replica,
+ const Shared<Network>& _network,
+ uint64_t _proposal,
+ const set<uint64_t>& _positions)
+ : ProcessBase(ID::generate("log-bulk-catch-up")),
+ quorum(_quorum),
+ replica(_replica),
+ network(_network),
+ positions(_positions),
+ proposal(_proposal) {}
+
+ virtual ~BulkCatchUpProcess() {}
+
+ Future<Nothing> future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ // Stop when no one cares.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+ // Catch-up each position in the set sequentially.
+ it = positions.begin();
+
+ catchup();
+ }
+
+ virtual void finalize()
+ {
+ catching.discard();
+ }
+
+private:
+ void catchup()
+ {
+ if (it == positions.end()) {
+ promise.set(Nothing());
+ terminate(self());
+ return;
+ }
+
+ // Store the future so that we can discard it if the user wants to
+ // cancel the catch-up operation.
+ catching = log::catchup(quorum, replica, network, proposal, *it);
+ catching.onAny(defer(self(), &Self::caughtup));
+ }
+
+ void caughtup()
+ {
+ // No one can discard the future 'catching' except the 'finalize'.
+ CHECK(!catching.isDiscarded());
+
+ if (catching.isFailed()) {
+ promise.fail(
+ "Failed to catch-up position " + stringify(*it) +
+ ": " + catching.failure());
+ terminate(self());
+ return;
+ }
+
+ ++it;
+
+ // The single position catch-up function: 'log::catchup' will
+ // return the highest proposal number seen so far. We use this
+ // proposal number for the next 'catchup' as it is highly likely
+ // that this number is high enough, saving potentially unnecessary
+ // proposal number bumps.
+ proposal = catching.get();
+
+ catchup();
+ }
+
+ const size_t quorum;
+ const Shared<Replica> replica;
+ const Shared<Network> network;
+ const set<uint64_t> positions;
+
+ uint64_t proposal;
+ set<uint64_t>::iterator it;
+
+ process::Promise<Nothing> promise;
+ Future<uint64_t> catching;
+};
+
+
+/////////////////////////////////////////////////
+// Public interfaces below.
+/////////////////////////////////////////////////
+
+
+Future<Nothing> catchup(
+ size_t quorum,
+ const Shared<Replica>& replica,
+ const Shared<Network>& network,
+ uint64_t proposal,
+ const set<uint64_t>& positions)
+{
+ BulkCatchUpProcess* process =
+ new BulkCatchUpProcess(
+ quorum,
+ replica,
+ network,
+ proposal,
+ positions);
+
+ Future<Nothing> future = process->future();
+ spawn(process, true);
+ return future;
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/catchup.hpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.hpp b/src/log/catchup.hpp
new file mode 100644
index 0000000..3652830
--- /dev/null
+++ b/src/log/catchup.hpp
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_CATCHUP_HPP__
+#define __LOG_CATCHUP_HPP__
+
+#include <stdint.h>
+
+#include <set>
+
+#include <process/future.hpp>
+#include <process/shared.hpp>
+
+#include <stout/nothing.hpp>
+
+#include "log/network.hpp"
+#include "log/replica.hpp"
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Catches-up a set of log positions in the local replica. The user of
+// this function can provide a hint on the proposal number that will
+// be used for Paxos. This could potentially save us a few Paxos
+// rounds. However, if the user has no idea what proposal number to
+// use, he can just use an arbitrary proposal number (e.g., 0).
+extern process::Future<Nothing> catchup(
+ size_t quorum,
+ const process::Shared<Replica>& replica,
+ const process::Shared<Network>& network,
+ uint64_t proposal,
+ const std::set<uint64_t>& positions);
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_CATCHUP_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/consensus.cpp
----------------------------------------------------------------------
diff --git a/src/log/consensus.cpp b/src/log/consensus.cpp
new file mode 100644
index 0000000..5eb90e7
--- /dev/null
+++ b/src/log/consensus.cpp
@@ -0,0 +1,711 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdlib.h>
+
+#include <set>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/id.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/foreach.hpp>
+
+#include "log/consensus.hpp"
+#include "log/replica.hpp"
+
+using namespace process;
+
+using std::set;
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+class ExplicitPromiseProcess : public Process<ExplicitPromiseProcess>
+{
+public:
+ ExplicitPromiseProcess(
+ size_t _quorum,
+ const Shared<Network>& _network,
+ uint64_t _proposal,
+ uint64_t _position)
+ : ProcessBase(ID::generate("log-explicit-promise")),
+ quorum(_quorum),
+ network(_network),
+ proposal(_proposal),
+ position(_position),
+ responsesReceived(0) {}
+
+ virtual ~ExplicitPromiseProcess() {}
+
+ Future<PromiseResponse> future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ // Stop when no one cares.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+ request.set_proposal(proposal);
+ request.set_position(position);
+
+ network->broadcast(protocol::promise, request)
+ .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+ }
+
+ virtual void finalize()
+ {
+ // This process will be terminated when we get responses from a
+ // quorum of replicas. In that case, we no longer care about
+ // responses from other replicas, thus discarding them here.
+ discard(responses);
+ }
+
+private:
+ void broadcasted(const Future<set<Future<PromiseResponse> > >& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to broadcast explicit promise request: " + future.failure() :
+ "Not expecting discarded future");
+ terminate(self());
+ return;
+ }
+
+ responses = future.get();
+ foreach (const Future<PromiseResponse>& response, responses) {
+ response.onReady(defer(self(), &Self::received, lambda::_1));
+ }
+ }
+
+ void received(const PromiseResponse& response)
+ {
+ responsesReceived++;
+
+ if (!response.okay()) {
+ // Failed to get the promise from a replica for this position
+ // because it has been promised to a proposer with a higher
+ // proposal number. The 'proposal' field in the response
+ // specifies the proposal number. It is found to be larger than
+ // the proposal number used in this phase.
+ if (highestNackProposal.isNone() ||
+ highestNackProposal.get() < response.proposal()) {
+ highestNackProposal = response.proposal();
+ }
+ } else if (highestNackProposal.isSome()) {
+ // We still want to wait for more potential NACK responses so we
+ // can return the highest proposal number seen but we don't care
+ // about any more ACK responses.
+ } else {
+ // The position has been promised to us so the 'proposal' field
+ // should match the proposal we sent in the request.
+ CHECK_EQ(response.proposal(), request.proposal());
+
+ if (response.has_action()) {
+ CHECK_EQ(response.action().position(), position);
+ if (response.action().has_learned() && response.action().learned()) {
+ // Received a learned action. Note that there is no checking
+ // that we get the _same_ learned action in the event we get
+ // multiple responses with learned actions, we just take the
+ // "first". In fact, there is a specific instance in which
+ // learned actions will NOT be the same! In this instance,
+ // one replica may return that the action is a learned no-op
+ // because it knows the position has been truncated while
+ // another replica (that hasn't learned the truncation yet)
+ // might return the actual action at this position. Picking
+ // either action is _correct_, since eventually we know this
+ // position will be truncated. Fun!
+ promise.set(response);
+
+ // The remaining responses will be discarded in 'finalize'.
+ terminate(self());
+ return;
+ } else if (response.action().has_performed()) {
+ // An action has already been performed in this position, we
+ // need to save the action with the highest proposal number.
+ if (highestAckAction.isNone() ||
+ (highestAckAction.get().performed() <
+ response.action().performed())) {
+ highestAckAction = response.action();
+ }
+ } else {
+ // Received a response for a position that had previously
+ // been promised to some other proposer but an action had
+ // not been performed or learned. The position is now
+ // promised to us. No need to do anything here.
+ }
+ } else {
+ // Received a response without an action associated with. This
+ // is the case where this proposer is this first one who asks
+ // promise for this log position.
+ CHECK(response.has_position());
+ CHECK_EQ(response.position(), position);
+ }
+ }
+
+ if (responsesReceived >= quorum) {
+ // A quorum of replicas have replied.
+ PromiseResponse result;
+
+ if (highestNackProposal.isSome()) {
+ result.set_okay(false);
+ result.set_proposal(highestNackProposal.get());
+ } else {
+ result.set_okay(true);
+ if (highestAckAction.isSome()) {
+ result.mutable_action()->CopyFrom(highestAckAction.get());
+ }
+ }
+
+ promise.set(result);
+ terminate(self());
+ }
+ }
+
+ const size_t quorum;
+ const Shared<Network> network;
+ const uint64_t proposal;
+ const uint64_t position;
+
+ PromiseRequest request;
+ set<Future<PromiseResponse> > responses;
+ size_t responsesReceived;
+ Option<uint64_t> highestNackProposal;
+ Option<Action> highestAckAction;
+
+ process::Promise<PromiseResponse> promise;
+};
+
+
+class ImplicitPromiseProcess : public Process<ImplicitPromiseProcess>
+{
+public:
+ ImplicitPromiseProcess(
+ size_t _quorum,
+ const Shared<Network>& _network,
+ uint64_t _proposal)
+ : ProcessBase(ID::generate("log-implicit-promise")),
+ quorum(_quorum),
+ network(_network),
+ proposal(_proposal),
+ responsesReceived(0) {}
+
+ virtual ~ImplicitPromiseProcess() {}
+
+ Future<PromiseResponse> future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ // Stop when no one cares.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+ request.set_proposal(proposal);
+
+ network->broadcast(protocol::promise, request)
+ .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+ }
+
+ virtual void finalize()
+ {
+ // This process will be terminated when we get responses from a
+ // quorum of replicas. In that case, we no longer care about
+ // responses from other replicas, thus discarding them here.
+ discard(responses);
+ }
+
+private:
+ void broadcasted(const Future<set<Future<PromiseResponse> > >& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to broadcast implicit promise request: " + future.failure() :
+ "Not expecting discarded future");
+ terminate(self());
+ return;
+ }
+
+ responses = future.get();
+ foreach (const Future<PromiseResponse>& response, responses) {
+ response.onReady(defer(self(), &Self::received, lambda::_1));
+ }
+ }
+
+ void received(const PromiseResponse& response)
+ {
+ responsesReceived++;
+
+ if (!response.okay()) {
+ // Failed to get the promise from a replica because it has
+ // promised a proposer with a higher proposal number. The
+ // 'proposal' field in the response specifies the proposal
+ // number. It is found to be larger than the proposal number
+ // used in this phase.
+ if (highestNackProposal.isNone() ||
+ highestNackProposal.get() < response.proposal()) {
+ highestNackProposal = response.proposal();
+ }
+ } else if (highestNackProposal.isSome()) {
+ // We still want to wait for more potential NACK responses so we
+ // can return the highest proposal number seen but we don't care
+ // about any more ACK responses.
+ } else {
+ CHECK(response.has_position());
+ if (highestEndPosition.isNone() ||
+ highestEndPosition.get() < response.position()) {
+ highestEndPosition = response.position();
+ }
+ }
+
+ if (responsesReceived >= quorum) {
+ // A quorum of replicas have replied.
+ PromiseResponse result;
+
+ if (highestNackProposal.isSome()) {
+ result.set_okay(false);
+ result.set_proposal(highestNackProposal.get());
+ } else {
+ CHECK_SOME(highestEndPosition);
+
+ result.set_okay(true);
+ result.set_position(highestEndPosition.get());
+ }
+
+ promise.set(result);
+ terminate(self());
+ }
+ }
+
+ const size_t quorum;
+ const Shared<Network> network;
+ const uint64_t proposal;
+
+ PromiseRequest request;
+ set<Future<PromiseResponse> > responses;
+ size_t responsesReceived;
+ Option<uint64_t> highestNackProposal;
+ Option<uint64_t> highestEndPosition;
+
+ process::Promise<PromiseResponse> promise;
+};
+
+
+class WriteProcess : public Process<WriteProcess>
+{
+public:
+ WriteProcess(
+ size_t _quorum,
+ const Shared<Network>& _network,
+ uint64_t _proposal,
+ const Action& _action)
+ : ProcessBase(ID::generate("log-write")),
+ quorum(_quorum),
+ network(_network),
+ proposal(_proposal),
+ action(_action),
+ responsesReceived(0) {}
+
+ virtual ~WriteProcess() {}
+
+ Future<WriteResponse> future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ // Stop when no one cares.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+ request.set_proposal(proposal);
+ request.set_position(action.position());
+ request.set_type(action.type());
+ switch (action.type()) {
+ case Action::NOP:
+ CHECK(action.has_nop());
+ request.mutable_nop();
+ break;
+ case Action::APPEND:
+ CHECK(action.has_append());
+ request.mutable_append()->CopyFrom(action.append());
+ break;
+ case Action::TRUNCATE:
+ CHECK(action.has_truncate());
+ request.mutable_truncate()->CopyFrom(action.truncate());
+ break;
+ default:
+ LOG(FATAL) << "Unknown Action::Type " << action.type();
+ }
+
+ network->broadcast(protocol::write, request)
+ .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+ }
+
+ virtual void finalize()
+ {
+ // This process will be terminated when we get responses from a
+ // quorum of replicas. In that case, we no longer care about
+ // responses from other replicas, thus discarding them here.
+ discard(responses);
+ }
+
+private:
+ void broadcasted(const Future<set<Future<WriteResponse> > >& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Failed to broadcast the write request: " + future.failure() :
+ "Not expecting discarded future");
+ terminate(self());
+ return;
+ }
+
+ responses = future.get();
+ foreach (const Future<WriteResponse>& response, responses) {
+ response.onReady(defer(self(), &Self::received, lambda::_1));
+ }
+ }
+
+ void received(const WriteResponse& response)
+ {
+ CHECK_EQ(response.position(), request.position());
+
+ responsesReceived++;
+
+ if (!response.okay()) {
+ // A replica rejects the write request because this position has
+ // been promised to a proposer with a higher proposal number.
+ // The 'proposal' field in the response specifies the proposal
+ // number. It is found to be larger than the proposal number
+ // used in this phase.
+ if (highestNackProposal.isNone() ||
+ highestNackProposal.get() < response.proposal()) {
+ highestNackProposal = response.proposal();
+ }
+ }
+
+ if (responsesReceived >= quorum) {
+ // A quorum of replicas have replied.
+ WriteResponse result;
+
+ if (highestNackProposal.isSome()) {
+ result.set_okay(false);
+ result.set_proposal(highestNackProposal.get());
+ } else {
+ result.set_okay(true);
+ }
+
+ promise.set(result);
+ terminate(self());
+ }
+ }
+
+ const size_t quorum;
+ const Shared<Network> network;
+ const uint64_t proposal;
+ const Action action;
+
+ WriteRequest request;
+ set<Future<WriteResponse> > responses;
+ size_t responsesReceived;
+ Option<uint64_t> highestNackProposal;
+
+ process::Promise<WriteResponse> promise;
+};
+
+
+class FillProcess : public Process<FillProcess>
+{
+public:
+ FillProcess(
+ size_t _quorum,
+ const Shared<Network>& _network,
+ uint64_t _proposal,
+ uint64_t _position)
+ : ProcessBase(ID::generate("log-fill")),
+ quorum(_quorum),
+ network(_network),
+ position(_position),
+ proposal(_proposal) {}
+
+ virtual ~FillProcess() {}
+
+ Future<Action> future() { return promise.future(); }
+
+protected:
+ virtual void initialize()
+ {
+ // Stop when no one cares.
+ promise.future().onDiscarded(lambda::bind(
+ static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
+
+ runPromisePhase();
+ }
+
+ virtual void finalize()
+ {
+ promising.discard();
+ writing.discard();
+ }
+
+private:
+ void runPromisePhase()
+ {
+ promising = log::promise(quorum, network, proposal, position);
+ promising.onAny(defer(self(), &Self::checkPromisePhase));
+ }
+
+ void checkPromisePhase()
+ {
+ // The future 'promising' can only be discarded in 'finalize'
+ CHECK(!promising.isDiscarded());
+
+ if (promising.isFailed()) {
+ promise.fail("Explicit promise phase failed: " + promising.failure());
+ terminate(self());
+ } else {
+ const PromiseResponse& response = promising.get();
+ if (!response.okay()) {
+ // Retry with a higher proposal number.
+ retry(response.proposal());
+ } else if (response.has_action()) {
+ // A previously performed write has been found. Paxos
+ // restricts us to write the same value.
+ Action action = response.action();
+
+ CHECK_EQ(action.position(), position);
+ CHECK(action.has_type());
+ action.set_promised(proposal);
+ action.set_performed(proposal);
+
+ if (action.has_learned() && action.learned()) {
+ // If the promise phase returns a learned action, we simply
+ // learn the action by broadcasting a learned message. We
+ // don't check if a quorum of replicas acknowledge the
+ // learned message. Because of that, a catch-up replica
+ // needs to make sure that all positions it needs to recover
+ // have been learned before it can re-join the Paxos (i.e.,
+ // invoking log::catchup). Otherwise, we may not have a
+ // quorum of replicas remember an agreed value, leading to
+ // potential inconsistency in the log.
+ runLearnPhase(action);
+ } else {
+ runWritePhase(action);
+ }
+ } else {
+ // No previously performed write has been found. We can
+ // write any value. We choose to write a NOP.
+ Action action;
+ action.set_position(position);
+ action.set_promised(proposal);
+ action.set_performed(proposal);
+ action.set_type(Action::NOP);
+ action.mutable_nop();
+
+ runWritePhase(action);
+ }
+ }
+ }
+
+ void runWritePhase(const Action& action)
+ {
+ CHECK(!action.has_learned() || !action.learned());
+
+ writing = log::write(quorum, network, proposal, action);
+ writing.onAny(defer(self(), &Self::checkWritePhase, action));
+ }
+
+ void checkWritePhase(const Action& action)
+ {
+ // The future 'writing' can only be discarded in 'finalize'.
+ CHECK(!writing.isDiscarded());
+
+ if (writing.isFailed()) {
+ promise.fail("Write phase failed: " + writing.failure());
+ terminate(self());
+ } else {
+ const WriteResponse& response = writing.get();
+ if (!response.okay()) {
+ // Retry with a higher proposal number.
+ retry(response.proposal());
+ } else {
+ // The write has been accepted (and thus performed) by a
+ // quorum of replicas. A consensus has been reached.
+ Action learnedAction = action;
+ learnedAction.set_learned(true);
+
+ runLearnPhase(learnedAction);
+ }
+ }
+ }
+
+ void runLearnPhase(const Action& action)
+ {
+ CHECK(action.has_learned() && action.learned());
+
+ // We need to make sure that the learned message has been
+ // broadcasted before the fill process completes. Some users may
+ // rely on this invariant (e.g. checking if the local replica has
+ // learned the action).
+ log::learn(network, action)
+ .onAny(defer(self(), &Self::checkLearnPhase, action, lambda::_1));
+ }
+
+ void checkLearnPhase(const Action& action, const Future<Nothing>& future)
+ {
+ if (!future.isReady()) {
+ promise.fail(
+ future.isFailed() ?
+ "Write phase failed: " + future.failure() :
+ "Not expecting discarded future");
+ terminate(self());
+ } else {
+ promise.set(action);
+ terminate(self());
+ }
+ }
+
+ void retry(uint64_t highestNackProposal)
+ {
+ // See comments below.
+ static const Duration T = Milliseconds(100);
+
+ // Bump the proposal number.
+ CHECK(highestNackProposal >= proposal);
+ proposal = highestNackProposal + 1;
+
+ // Randomized back-off. Generate a random delay in [T, 2T). T has
+ // to be chosen carefully. We want T >> broadcast time such that
+ // one proposer usually times out and wins before others wake up.
+ // On the other hand, we want T to be as small as possible such
+ // that we can reduce the wait time.
+ Duration d = T * (1.0 + (double) ::random() / RAND_MAX);
+ delay(d, self(), &Self::runPromisePhase);
+ }
+
+ const size_t quorum;
+ const Shared<Network> network;
+ const uint64_t position;
+
+ uint64_t proposal;
+
+ process::Promise<Action> promise;
+ Future<PromiseResponse> promising;
+ Future<WriteResponse> writing;
+};
+
+
+/////////////////////////////////////////////////
+// Public interfaces below.
+/////////////////////////////////////////////////
+
+
+Future<PromiseResponse> promise(
+ size_t quorum,
+ const Shared<Network>& network,
+ uint64_t proposal,
+ const Option<uint64_t>& position)
+{
+ if (position.isNone()) {
+ ImplicitPromiseProcess* process =
+ new ImplicitPromiseProcess(
+ quorum,
+ network,
+ proposal);
+
+ Future<PromiseResponse> future = process->future();
+ spawn(process, true);
+ return future;
+ } else {
+ ExplicitPromiseProcess* process =
+ new ExplicitPromiseProcess(
+ quorum,
+ network,
+ proposal,
+ position.get());
+
+ Future<PromiseResponse> future = process->future();
+ spawn(process, true);
+ return future;
+ }
+}
+
+
+Future<WriteResponse> write(
+ size_t quorum,
+ const Shared<Network>& network,
+ uint64_t proposal,
+ const Action& action)
+{
+ WriteProcess* process =
+ new WriteProcess(
+ quorum,
+ network,
+ proposal,
+ action);
+
+ Future<WriteResponse> future = process->future();
+ spawn(process, true);
+ return future;
+}
+
+
+Future<Nothing> learn(const Shared<Network>& network, const Action& action)
+{
+ LearnedMessage message;
+ message.mutable_action()->CopyFrom(action);
+
+ if (!action.has_learned() || !action.learned()) {
+ message.mutable_action()->set_learned(true);
+ }
+
+ return network->broadcast(message);
+}
+
+
+Future<Action> fill(
+ size_t quorum,
+ const Shared<Network>& network,
+ uint64_t proposal,
+ uint64_t position)
+{
+ FillProcess* process =
+ new FillProcess(
+ quorum,
+ network,
+ proposal,
+ position);
+
+ Future<Action> future = process->future();
+ spawn(process, true);
+ return future;
+}
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/consensus.hpp
----------------------------------------------------------------------
diff --git a/src/log/consensus.hpp b/src/log/consensus.hpp
new file mode 100644
index 0000000..ba41601
--- /dev/null
+++ b/src/log/consensus.hpp
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOG_CONSENSUS_HPP__
+#define __LOG_CONSENSUS_HPP__
+
+#include <stdint.h>
+
+#include <process/future.hpp>
+#include <process/shared.hpp>
+
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+
+#include "log/network.hpp"
+
+#include "messages/log.hpp"
+
+// We use Paxos consensus protocol to agree on the value of each entry
+// in the replicated log. In our system, each replica is both an
+// acceptor and a learner. There are several types of proposers in the
+// system. Coordinator is one type of proposers we use to append new
+// log entries. The 'log::fill' function below creates an internal
+// proposer each time it is called. These internal proposers are used
+// to agree on previously written entries in the log.
+
+namespace mesos {
+namespace internal {
+namespace log {
+
+// Runs the promise phase (a.k.a., the prepare phase) in Paxos. This
+// phase has two purposes. First, the proposer asks promises from a
+// quorum of replicas not to accept writes from proposers with lower
+// proposal numbers. Second, the proposer looks for potential
+// previously agreed values. Only these values can be written in the
+// next phase. This restriction is used by Paxos to make sure that if
+// a value has been agreed on for a log position, subsequent writes to
+// this log position will always have the same value. We can run the
+// promise phase either for a specified log position ("explicit"
+// promise), or for all positions that have not yet been promised to
+// any proposer ("implicit" promise). The latter is a well known
+// optimization called Multi-Paxos. If the leader is relatively
+// stable, we can skip the promise phase for future instance of the
+// protocol with the same leader.
+//
+// We re-use PromiseResponse to specify the return value of this
+// phase. In the case of explicit promise, if a learned action has
+// been found in a response, this phase succeeds immediately with the
+// 'okay' field set to true and the 'action' field set to the learned
+// action. If no learned action has been found in a quorum of
+// replicas, we first check if some of them reply Nack (i.e., they
+// refuse to give promise). If yes, we set the 'okay' field to false
+// and set the 'proposal' field to be the highest proposal number seen
+// in these Nack responses. If none of them replies Nack, we set the
+// 'okay' field to true and set the 'action' field to be the action
+// that is performed by the proposer with the highest proposal number
+// in these responses. If no action has been found in these responses,
+// we leave the 'action' field unset.
+//
+// In the case of implicit promise, we must wait until a quorum of
+// replicas have replied. If some of them reply Nack, we set the
+// 'okay' field to false and set the 'proposal' field to be the
+// highest proposal number seen in these Nack responses. If none of
+// them replies Nack, we set the 'okay' field to true and set the
+// 'position' field to be the highest position (end position) seen in
+// these responses.
+extern process::Future<PromiseResponse> promise(
+ size_t quorum,
+ const process::Shared<Network>& network,
+ uint64_t proposal,
+ const Option<uint64_t>& position = None());
+
+
+// Runs the write phase (a.k.a., the propose phase) in Paxos. In this
+// phase, the proposer broadcasts a write to replicas. This phase
+// succeeds if a quorum of replicas accept the write. A proposer
+// cannot write if it hasn't gained enough (i.e., a quorum of)
+// promises from replicas. We re-use WriteResponse to specify the
+// return value of this phase. We must wait until a quorum of replicas
+// have replied. If some of them reply Nack, we set the 'okay' field
+// to false and set the 'proposal' field to be the highest proposal
+// number seen in these Nack responses. If none of them replies Nack,
+// we set the 'okay' field to true.
+extern process::Future<WriteResponse> write(
+ size_t quorum,
+ const process::Shared<Network>& network,
+ uint64_t proposal,
+ const Action& action);
+
+
+// Runs the learn phase (a.k.a, the commit phase) in Paxos. In fact,
+// this phase is not required, but treated as an optimization. In this
+// phase, a proposer broadcasts a learned message to replicas,
+// indicating that a consensus has already been reached for the given
+// log position. No need to wait for responses from replicas. When
+// the future is ready, the learned message has been broadcasted.
+extern process::Future<Nothing> learn(
+ const process::Shared<Network>& network,
+ const Action& action);
+
+
+// Tries to reach consensus for the given log position by running a
+// full Paxos round (i.e., promise -> write -> learn). If no value has
+// been previously agreed on for the given log position, a NOP will be
+// proposed. This function will automatically retry by bumping the
+// proposal number if the specified proposal number is found to be not
+// high enough. To ensure liveness, it will inject a random delay
+// before retrying. A learned action will be returned when the
+// operation succeeds.
+extern process::Future<Action> fill(
+ size_t quorum,
+ const process::Shared<Network>& network,
+ uint64_t proposal,
+ uint64_t position);
+
+} // namespace log {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __LOG_CONSENSUS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/coordinator.cpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp
index 6e6466f..b2ead8e 100644
--- a/src/log/coordinator.cpp
+++ b/src/log/coordinator.cpp
@@ -18,38 +18,33 @@
#include <algorithm>
-#include <process/dispatch.hpp>
-#include <process/future.hpp>
-
-#include <stout/check.hpp>
-#include <stout/duration.hpp>
#include <stout/error.hpp>
-#include <stout/foreach.hpp>
#include <stout/none.hpp>
+#include "log/catchup.hpp"
+#include "log/consensus.hpp"
#include "log/coordinator.hpp"
-#include "log/replica.hpp"
+
+#include "messages/log.hpp"
using namespace process;
-using std::list;
-using std::pair;
using std::set;
using std::string;
-
namespace mesos {
namespace internal {
namespace log {
-Coordinator::Coordinator(int _quorum,
- Replica* _replica,
- Network* _network)
- : elected(false),
- quorum(_quorum),
+Coordinator::Coordinator(
+ size_t _quorum,
+ const Shared<Replica>& _replica,
+ const Shared<Network>& _network)
+ : quorum(_quorum),
replica(_replica),
network(_network),
- id(0),
+ elected(false),
+ proposal(0),
index(0) {}
@@ -67,94 +62,76 @@ Result<uint64_t> Coordinator::elect(const Timeout& timeout)
}
// Get the highest known promise from our local replica.
- Future<uint64_t> promise = replica->promised();
+ Future<uint64_t> promised = replica->promised();
- if (!promise.await(timeout.remaining())) {
+ if (!promised.await(timeout.remaining())) {
+ promised.discard();
return None();
- } else if (promise.isFailed()) {
- return Error(promise.failure());
+ } else if (promised.isFailed()) {
+ return Error(promised.failure());
}
- CHECK(promise.isReady()) << "Not expecting a discarded future!";
-
- id = std::max(id, promise.get()) + 1; // Try the next highest!
-
- PromiseRequest request;
- request.set_id(id);
-
- // Broadcast the request to the network.
- set<Future<PromiseResponse> > futures =
- broadcast(protocol::promise, request);
-
- uint32_t okays = 0;
-
- do {
- Future<Future<PromiseResponse> > future = select(futures);
- if (future.await(timeout.remaining())) {
- CHECK(future.get().isReady());
- const PromiseResponse& response = future.get().get();
- if (!response.okay()) {
- return None(); // Lost an election, but can retry.
- } else if (response.okay()) {
- CHECK(response.has_position());
- index = std::max(index, response.position());
- okays++;
- if (okays >= quorum) {
- break;
- }
- }
- futures.erase(future.get());
- }
- } while (timeout.remaining() > Seconds(0));
+ CHECK(promised.isReady()) << "Not expecting a discarded future!";
+
+ proposal = std::max(proposal, promised.get()) + 1; // Try the next highest!
+
+ // Run the implicit promise phase.
+ Future<PromiseResponse> promising = log::promise(quorum, network, proposal);
+
+ if (!promising.await(timeout.remaining())) {
+ promising.discard();
+ return None();
+ } else if (promising.isFailed()) {
+ return Error(promising.failure());
+ }
- // Discard the remaining futures.
- discard(futures);
+ CHECK(promising.isReady()) << "Not expecting a discarded future!";
- // Either we have a quorum or we timed out.
- if (okays >= quorum) {
+ const PromiseResponse& response = promising.get();
+ if (!response.okay()) {
+ // Lost an election, but can retry.
+ proposal = response.proposal();
+ return None();
+ } else {
LOG(INFO) << "Coordinator elected, attempting to fill missing positions";
- elected = true;
- // Need to "catchup" local replica (i.e., fill in any unlearned
+ CHECK(response.has_position());
+
+ index = response.position();
+
+ // Need to "catch-up" local replica (i.e., fill in any unlearned
// and/or missing positions) so that we can do local reads.
// Usually we could do this lazily, however, a local learned
// position might have been truncated, so we actually need to
- // catchup the local replica all the way to the end of the log
+ // catch-up the local replica all the way to the end of the log
// before we can perform any up-to-date local reads.
- Future<set<uint64_t> > positions = replica->missing(index);
+ Future<set<uint64_t> > positions = replica->missing(0, index);
if (!positions.await(timeout.remaining())) {
- elected = false;
+ positions.discard();
return None();
} else if (positions.isFailed()) {
- elected = false;
return Error(positions.failure());
}
CHECK(positions.isReady()) << "Not expecting a discarded future!";
- foreach (uint64_t position, positions.get()) {
- Result<Action> result = fill(position, timeout);
- if (result.isError()) {
- elected = false;
- return Error(result.error());
- } else if (result.isNone()) {
- elected = false;
- return None();
- } else {
- CHECK_SOME(result);
- CHECK(result.get().position() == position);
- }
+ Future<Nothing> catching =
+ log::catchup(quorum, replica, network, proposal, positions.get());
+
+ if (!catching.await(timeout.remaining())) {
+ catching.discard();
+ return None();
+ } else if (catching.isFailed()) {
+ return Error(catching.failure());
}
- index += 1;
- return index - 1;
- }
+ CHECK(catching.isReady()) << "Not expecting a discarded future!";
- // Timed out ...
- LOG(INFO) << "Coordinator timed out while trying to get elected";
- return None();
+ elected = true;
+ return index++;
+ }
}
@@ -175,8 +152,8 @@ Result<uint64_t> Coordinator::append(
Action action;
action.set_position(index);
- action.set_promised(id);
- action.set_performed(id);
+ action.set_promised(proposal);
+ action.set_performed(proposal);
action.set_type(Action::APPEND);
Action::Append* append = action.mutable_append();
append->set_bytes(bytes);
@@ -184,7 +161,7 @@ Result<uint64_t> Coordinator::append(
Result<uint64_t> result = write(action, timeout);
if (result.isSome()) {
- CHECK(result.get() == index);
+ CHECK_EQ(result.get(), index);
index++;
}
@@ -202,8 +179,8 @@ Result<uint64_t> Coordinator::truncate(
Action action;
action.set_position(index);
- action.set_promised(id);
- action.set_performed(id);
+ action.set_promised(proposal);
+ action.set_performed(proposal);
action.set_type(Action::TRUNCATE);
Action::Truncate* truncate = action.mutable_truncate();
truncate->set_to(to);
@@ -211,7 +188,7 @@ Result<uint64_t> Coordinator::truncate(
Result<uint64_t> result = write(action, timeout);
if (result.isSome()) {
- CHECK(result.get() == index);
+ CHECK_EQ(result.get(), index);
index++;
}
@@ -233,309 +210,60 @@ Result<uint64_t> Coordinator::write(
CHECK(action.has_performed());
CHECK(action.has_type());
- // TODO(benh): Eliminate this special case hack?
- if (quorum == 1) {
- Result<uint64_t> result = commit(action);
- if (result.isError()) {
- return Error(result.error());
- } else if (result.isNone()) {
- return None();
- } else {
- CHECK_SOME(result);
- return action.position();
- }
- }
-
- WriteRequest request;
- request.set_id(id);
- request.set_position(action.position());
- request.set_type(action.type());
- switch (action.type()) {
- case Action::NOP:
- CHECK(action.has_nop());
- request.mutable_nop();
- break;
- case Action::APPEND:
- CHECK(action.has_append());
- request.mutable_append()->MergeFrom(action.append());
- break;
- case Action::TRUNCATE:
- CHECK(action.has_truncate());
- request.mutable_truncate()->MergeFrom(action.truncate());
- break;
- default:
- LOG(FATAL) << "Unknown Action::Type!";
- }
-
- // Broadcast the request to the network *excluding* the local replica.
- set<Future<WriteResponse> > futures =
- remotecast(protocol::write, request);
-
- uint32_t okays = 0;
-
- do {
- Future<Future<WriteResponse> > future = select(futures);
- if (future.await(timeout.remaining())) {
- CHECK(future.get().isReady());
- const WriteResponse& response = future.get().get();
- CHECK(response.id() == request.id());
- CHECK(response.position() == request.position());
- if (!response.okay()) {
- elected = false;
- return Error("Coordinator demoted");
- } else if (response.okay()) {
- if (++okays >= (quorum - 1)) { // N.B. Using (quorum - 1) here!
- // Got enough remote okays, discard the remaining futures
- // and try and commit the action locally.
- discard(futures);
- Result<uint64_t> result = commit(action);
- if (result.isError()) {
- return Error(result.error());
- } else if (result.isNone()) {
- return None();
- } else {
- CHECK_SOME(result);
- return action.position();
- }
- }
- }
- futures.erase(future.get());
- }
- } while (timeout.remaining() > Seconds(0));
-
- // Timed out ... discard remaining futures.
- LOG(INFO) << "Coordinator timed out while attempting to write "
- << Action::Type_Name(action.type())
- << " action at position " << action.position();
- discard(futures);
- return None();
-}
-
-
-Result<uint64_t> Coordinator::commit(const Action& action)
-{
- LOG(INFO) << "Coordinator attempting to commit "
- << Action::Type_Name(action.type())
- << " action at position " << action.position();
-
- CHECK(elected);
-
- WriteRequest request;
- request.set_id(id);
- request.set_position(action.position());
- request.set_learned(true); // A commit is just a learned write.
- request.set_type(action.type());
- switch (action.type()) {
- case Action::NOP:
- CHECK(action.has_nop());
- request.mutable_nop();
- break;
- case Action::APPEND:
- CHECK(action.has_append());
- request.mutable_append()->MergeFrom(action.append());
- break;
- case Action::TRUNCATE:
- CHECK(action.has_truncate());
- request.mutable_truncate()->MergeFrom(action.truncate());
- break;
- default:
- LOG(FATAL) << "Unknown Action::Type!";
- }
-
- // TODO(benh): Add a non-message based way to do this write.
- Future<WriteResponse> future = protocol::write(replica->pid(), request);
-
- // We send a write request to the *local* replica just as the
- // others: asynchronously via messages. However, rather than add the
- // complications of dealing with timeouts for local operations
- // (especially since we are trying to commit something), we make
- // things simpler and block on the response from the local replica.
- // Maybe we can let it timeout, but consider it a failure? This
- // might be sound because we don't send the learned messages ... so
- // this should be the same as if we just failed before we even do
- // the write ... a client should just retry this write later.
+ Future<WriteResponse> writing =
+ log::write(quorum, network, proposal, action);
- future.await(); // TODO(benh): Don't wait forever, see comment above.
-
- if (future.isFailed()) {
- return Error(future.failure());
+ if (!writing.await(timeout.remaining())) {
+ writing.discard();
+ return None();
+ } else if (writing.isFailed()) {
+ return Error(writing.failure());
}
- CHECK(future.isReady()) << "Not expecting a discarded future!";
-
- const WriteResponse& response = future.get();
- CHECK(response.id() == request.id());
- CHECK(response.position() == request.position());
+ CHECK(writing.isReady()) << "Not expecting a discarded future!";
+ const WriteResponse& response = writing.get();
if (!response.okay()) {
elected = false;
+ proposal = response.proposal();
return Error("Coordinator demoted");
- }
-
- // Commit successful, send a learned message to the network
- // *excluding* the local replica and return the position.
-
- LearnedMessage message;
- message.mutable_action()->MergeFrom(action);
-
- if (!action.has_learned() || !action.learned()) {
- message.mutable_action()->set_learned(true);
- }
-
- LOG(INFO) << "Telling other replicas of learned action at position "
- << action.position();
-
- remotecast(message);
-
- return action.position();
-}
-
-
-Result<Action> Coordinator::fill(uint64_t position, const Timeout& timeout)
-{
- LOG(INFO) << "Coordinator attempting to fill position "
- << position << " in the log";
-
- CHECK(elected);
-
- PromiseRequest request;
- request.set_id(id);
- request.set_position(position);
-
- // Broadcast the request to the network.
- set<Future<PromiseResponse> > futures =
- broadcast(protocol::promise, request);
-
- list<PromiseResponse> responses;
-
- do {
- Future<Future<PromiseResponse> > future = select(futures);
- if (future.await(timeout.remaining())) {
- CHECK(future.get().isReady());
- const PromiseResponse& response = future.get().get();
- CHECK(response.id() == request.id());
- if (!response.okay()) {
- elected = false;
- return Error("Coordinator demoted");
- } else if (response.okay()) {
- responses.push_back(response);
- if (responses.size() >= quorum) {
- break;
- }
- }
- futures.erase(future.get());
- }
- } while (timeout.remaining() > Seconds(0));
-
- // Discard the remaining futures.
- discard(futures);
-
- // Either have a quorum or we timed out.
- if (responses.size() >= quorum) {
- // Check the responses for a learned action, otherwise, pick the
- // action with the higest performed id or a no-op if no responses
- // include performed actions.
- Action action;
- foreach (const PromiseResponse& response, responses) {
- if (response.has_action()) {
- CHECK(response.action().position() == position);
- if (response.action().has_learned() && response.action().learned()) {
- // Received a learned action, try and commit locally. Note
- // that there is no checking that we get the _same_ learned
- // action in the event we get multiple responses with
- // learned actions, we just take the "first". In fact, there
- // is a specific instance in which learned actions will NOT
- // be the same! In this instance, one replica may return
- // that the action is a learned no-op because it knows the
- // position has been truncated while another replica (that
- // hasn't learned the truncation yet) might return the
- // actual action at this position. Picking either action is
- // _correct_, since eventually we know this position will be
- // truncated. Fun!
- Result<uint64_t> result = commit(response.action());
- if (result.isError()) {
- return Error(result.error());
- } else if (result.isNone()) {
- return None();
- } else {
- CHECK_SOME(result);
- return response.action();
- }
- } else if (response.action().has_performed() &&
- (!action.has_performed() ||
- response.action().performed() > action.performed())) {
- action = response.action();
- }
- } else {
- CHECK(response.has_position());
- CHECK(response.position() == position);
- }
+ } else {
+ // TODO(jieyu): Currently, each log operation (append or truncate)
+ // will write the same log content to the local disk twice: one
+ // from log::write() and one from log::learn(). In the future, we
+ // may want to use checksum to eliminate the duplicate disk write.
+ Future<Nothing> learning = log::learn(network, action);
+
+ // We need to make sure that learned message has been broadcasted,
+ // thus has been enqueued. Otherwise, our "missing" check below
+ // will fail sometimes due to race condition.
+ if (!learning.await(timeout.remaining())) {
+ learning.discard();
+ return None();
+ } else if (learning.isFailed()) {
+ return Error(learning.failure());
}
- // Use a no-op if no known action has been performed.
- if (!action.has_performed()) {
- action.set_position(position);
- action.set_promised(id);
- action.set_performed(id);
- action.set_type(Action::NOP);
- action.mutable_nop();
- } else {
- action.set_performed(id);
- }
+ CHECK(learning.isReady()) << "Not expecting a discarded future!";
- Result<uint64_t> result = write(action, timeout);
+ // Make sure that the local replica has learned the newly written
+ // log entry. Since messages are delivered and dispatched in order
+ // locally, we should always have the new entry learned by now.
+ Future<bool> checking = replica->missing(action.position());
- if (result.isError()) {
- return Error(result.error());
- } else if (result.isNone()) {
+ if (!checking.await(timeout.remaining())) {
+ checking.discard();
return None();
- } else {
- CHECK_SOME(result);
- return action;
+ } else if (checking.isFailed()) {
+ return Error(checking.failure());
}
- }
-
- // Timed out ...
- LOG(INFO) << "Coordinator timed out attempting to fill position "
- << position << " in the log";
- return None();
-}
+ CHECK(checking.isReady()) << "Not expecting a discarded future!";
-template <typename Req, typename Res>
-set<Future<Res> > Coordinator::broadcast(
- const Protocol<Req, Res>& protocol,
- const Req& req)
-{
- Future<set<Future<Res> > > futures =
- network->broadcast(protocol, req);
- futures.await();
- CHECK(futures.isReady());
- return futures.get();
-}
-
+ CHECK(!checking.get());
-template <typename Req, typename Res>
-set<Future<Res> > Coordinator::remotecast(
- const Protocol<Req, Res>& protocol,
- const Req& req)
-{
- set<UPID> filter;
- filter.insert(replica->pid());
- Future<set<Future<Res> > > futures =
- network->broadcast(protocol, req, filter);
- futures.await();
- CHECK(futures.isReady());
- return futures.get();
-}
-
-
-template <typename M>
-void Coordinator::remotecast(const M& m)
-{
- set<UPID> filter;
- filter.insert(replica->pid());
- network->broadcast(m, filter);
+ return action.position();
+ }
}
} // namespace log {
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/coordinator.hpp
----------------------------------------------------------------------
diff --git a/src/log/coordinator.hpp b/src/log/coordinator.hpp
index 3f6fb7c..b0ff8df 100644
--- a/src/log/coordinator.hpp
+++ b/src/log/coordinator.hpp
@@ -19,10 +19,11 @@
#ifndef __LOG_COORDINATOR_HPP__
#define __LOG_COORDINATOR_HPP__
+#include <stdint.h>
+
#include <string>
-#include <vector>
-#include <process/process.hpp>
+#include <process/shared.hpp>
#include <process/timeout.hpp>
#include <stout/result.hpp>
@@ -30,9 +31,6 @@
#include "log/network.hpp"
#include "log/replica.hpp"
-#include "messages/log.hpp"
-
-
namespace mesos {
namespace internal {
namespace log {
@@ -40,7 +38,10 @@ namespace log {
class Coordinator
{
public:
- Coordinator(int quorum, Replica* replica, Network* group);
+ Coordinator(
+ size_t _quorum,
+ const process::Shared<Replica>& _replica,
+ const process::Shared<Network>& _network);
~Coordinator();
@@ -65,45 +66,16 @@ public:
Result<uint64_t> truncate(uint64_t to, const process::Timeout& timeout);
private:
- // Helper that tries to achieve consensus of the specified action. A
- // result of none means the write failed (e.g., due to timeout), but
- // can be retried.
- Result<uint64_t> write(const Action& action, const process::Timeout& timeout);
-
- // Helper that handles commiting an action (i.e., writing to the
- // local replica and then sending out learned messages).
- Result<uint64_t> commit(const Action& action);
-
- // Helper that tries to fill a position in the log.
- Result<Action> fill(uint64_t position, const process::Timeout& timeout);
-
- // Helper that uses the specified protocol to broadcast a request to
- // our group and return a set of futures.
- template <typename Req, typename Res>
- std::set<process::Future<Res> > broadcast(
- const Protocol<Req, Res>& protocol,
- const Req& req);
-
- // Helper like broadcast, but excludes our local replica.
- template <typename Req, typename Res>
- std::set<process::Future<Res> > remotecast(
- const Protocol<Req, Res>& protocol,
- const Req& req);
-
- // Helper like remotecast but ignores any responses.
- template <typename M>
- void remotecast(const M& m);
-
- bool elected; // True if this coordinator has been elected.
-
- const uint32_t quorum; // Quorum size.
-
- Replica* replica; // Local log replica.
-
- Network* network; // Used to broadcast requests and messages to replicas.
+ Result<uint64_t> write(
+ const Action& action,
+ const process::Timeout& timeout);
- uint64_t id; // Coordinator ID.
+ const size_t quorum;
+ const process::Shared<Replica> replica;
+ const process::Shared<Network> network;
+ bool elected; // True if this coordinator has been elected.
+ uint64_t proposal; // Currently used proposal number.
uint64_t index; // Last position written in the log.
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/log.hpp
----------------------------------------------------------------------
diff --git a/src/log/log.hpp b/src/log/log.hpp
index 77edc7a..042f13b 100644
--- a/src/log/log.hpp
+++ b/src/log/log.hpp
@@ -23,7 +23,9 @@
#include <set>
#include <string>
+#include <process/owned.hpp>
#include <process/process.hpp>
+#include <process/shared.hpp>
#include <process/timeout.hpp>
#include <stout/check.hpp>
@@ -139,7 +141,7 @@ public:
Position ending();
private:
- Replica* replica;
+ process::Shared<Replica> replica;
};
class Writer
@@ -179,18 +181,18 @@ public:
Log(int _quorum,
const std::string& path,
const std::set<process::UPID>& pids)
- : group(NULL)
+ : group(NULL),
+ executor(NULL),
+ quorum(_quorum),
+ replica(new Replica(path))
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
- quorum = _quorum;
+ // Add our own replica to the network.
+ Network* _network = new Network(pids);
+ _network->add(replica->pid());
- replica = new Replica(path);
-
- network = new Network(pids);
-
- // Don't forget to add our own replica!
- network->add(replica->pid());
+ network.reset(_network);
}
// Creates a new replicated log that assumes the specified quorum
@@ -203,36 +205,34 @@ public:
const Duration& timeout,
const std::string& znode,
const Option<zookeeper::Authentication>& auth = None())
+ : group(new zookeeper::Group(servers, timeout, znode, auth)),
+ executor(new process::Executor()),
+ quorum(_quorum),
+ replica(new Replica(path)),
+ network(new ZooKeeperNetwork(servers, timeout, znode, auth))
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
- quorum = _quorum;
-
- LOG(INFO) << "Creating a new log replica";
-
- replica = new Replica(path);
-
- group = new zookeeper::Group(servers, timeout, znode, auth);
- network = new ZooKeeperNetwork(group);
-
// Need to add our replica to the ZooKeeper group!
LOG(INFO) << "Attempting to join replica to ZooKeeper group";
membership = group->join(replica->pid())
- .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+ .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
group->watch()
- .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
- .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+ .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
+ .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
}
~Log()
{
- delete network;
+ network.own().await();
+ replica.own().await();
+
+ delete executor;
delete group;
- delete replica;
}
// Returns a position based off of the bytes recovered from
@@ -261,14 +261,15 @@ private:
void failed(const std::string& message) const;
void discarded() const;
+ // We store a Group instance in order to continually renew the
+ // replicas membership (when using ZooKeeper).
zookeeper::Group* group;
process::Future<zookeeper::Group::Membership> membership;
- process::Executor executor;
+ process::Executor* executor;
int quorum;
-
- Replica* replica;
- Network* network;
+ process::Shared<Replica> replica;
+ process::Shared<Network> network;
};
@@ -420,14 +421,14 @@ void Log::watch(const std::set<zookeeper::Group::Membership>& memberships)
// Our replica's membership must have expired, join back up.
LOG(INFO) << "Renewing replica group membership";
membership = group->join(replica->pid())
- .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+ .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
}
group->watch(memberships)
- .onReady(executor.defer(lambda::bind(&Log::watch, this, lambda::_1)))
- .onFailed(executor.defer(lambda::bind(&Log::failed, this, lambda::_1)))
- .onDiscarded(executor.defer(lambda::bind(&Log::discarded, this)));
+ .onReady(executor->defer(lambda::bind(&Log::watch, this, lambda::_1)))
+ .onFailed(executor->defer(lambda::bind(&Log::failed, this, lambda::_1)))
+ .onDiscarded(executor->defer(lambda::bind(&Log::discarded, this)));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/19ad88b7/src/log/network.hpp
----------------------------------------------------------------------
diff --git a/src/log/network.hpp b/src/log/network.hpp
index d34cf78..2b674f6 100644
--- a/src/log/network.hpp
+++ b/src/log/network.hpp
@@ -33,6 +33,7 @@
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
#include "logging/logging.hpp"
@@ -67,13 +68,14 @@ public:
process::Future<std::set<process::Future<Res> > > broadcast(
const Protocol<Req, Res>& protocol,
const Req& req,
- const std::set<process::UPID>& filter = std::set<process::UPID>());
+ const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
- // Sends a message to each member of the network.
+ // Sends a message to each member of the network. The returned
+ // future is set when the message is broadcasted.
template <typename M>
- void broadcast(
+ process::Future<Nothing> broadcast(
const M& m,
- const std::set<process::UPID>& filter = std::set<process::UPID>());
+ const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
private:
// Not copyable, not assignable.
@@ -87,11 +89,19 @@ private:
class ZooKeeperNetwork : public Network
{
public:
- ZooKeeperNetwork(zookeeper::Group* group);
+ ZooKeeperNetwork(
+ const std::string& servers,
+ const Duration& timeout,
+ const std::string& znode,
+ const Option<zookeeper::Authentication>& auth);
private:
typedef ZooKeeperNetwork This;
+ // Not copyable, not assignable.
+ ZooKeeperNetwork(const ZooKeeperNetwork&);
+ ZooKeeperNetwork& operator = (const ZooKeeperNetwork&);
+
// Helper that sets up a watch on the group.
void watch(const std::set<zookeeper::Group::Membership>& expected);
@@ -101,9 +111,13 @@ private:
// Invoked when group members data has been collected.
void collected(const process::Future<std::list<std::string> >& datas);
- zookeeper::Group* group;
- process::Executor executor;
+ zookeeper::Group group;
process::Future<std::set<zookeeper::Group::Membership> > memberships;
+
+ // NOTE: The declaration order here is important. We want to delete
+ // the 'executor' before we delete the 'group' so that we don't get
+ // spurious fatal errors when the 'group' is being deleted.
+ process::Executor executor;
};
@@ -157,7 +171,7 @@ public:
}
template <typename M>
- void broadcast(
+ Nothing broadcast(
const M& m,
const std::set<process::UPID>& filter)
{
@@ -168,6 +182,7 @@ public:
process::post(pid, m);
}
}
+ return Nothing();
}
private:
@@ -223,7 +238,7 @@ template <typename Req, typename Res>
process::Future<std::set<process::Future<Res> > > Network::broadcast(
const Protocol<Req, Res>& protocol,
const Req& req,
- const std::set<process::UPID>& filter)
+ const std::set<process::UPID>& filter) const
{
return process::dispatch(process, &NetworkProcess::broadcast<Req, Res>,
protocol, req, filter);
@@ -231,20 +246,24 @@ process::Future<std::set<process::Future<Res> > > Network::broadcast(
template <typename M>
-void Network::broadcast(
+process::Future<Nothing> Network::broadcast(
const M& m,
- const std::set<process::UPID>& filter)
+ const std::set<process::UPID>& filter) const
{
// Need to disambiguate overloaded function.
- void (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&) =
- &NetworkProcess::broadcast<M>;
+ Nothing (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&)
+ = &NetworkProcess::broadcast<M>;
- process::dispatch(process, broadcast, m, filter);
+ return process::dispatch(process, broadcast, m, filter);
}
-inline ZooKeeperNetwork::ZooKeeperNetwork(zookeeper::Group* _group)
- : group(_group)
+inline ZooKeeperNetwork::ZooKeeperNetwork(
+ const std::string& servers,
+ const Duration& timeout,
+ const std::string& znode,
+ const Option<zookeeper::Authentication>& auth)
+ : group(servers, timeout, znode, auth)
{
watch(std::set<zookeeper::Group::Membership>());
}
@@ -253,7 +272,7 @@ inline ZooKeeperNetwork::ZooKeeperNetwork(zookeeper::Group* _group)
inline void ZooKeeperNetwork::watch(
const std::set<zookeeper::Group::Membership>& expected)
{
- memberships = group->watch(expected);
+ memberships = group.watch(expected);
memberships
.onAny(executor.defer(lambda::bind(&This::watched, this, lambda::_1)));
}
@@ -264,7 +283,7 @@ inline void ZooKeeperNetwork::watched(
{
if (memberships.isFailed()) {
// We can't do much here, we could try creating another Group but
- // that might just continue indifinitely, so we fail early
+ // that might just continue indefinitely, so we fail early
// instead. Note that Group handles all retryable/recoverable
// ZooKeeper errors internally.
LOG(FATAL) << "Failed to watch ZooKeeper group: " << memberships.failure();
@@ -278,7 +297,7 @@ inline void ZooKeeperNetwork::watched(
std::list<process::Future<std::string> > futures;
foreach (const zookeeper::Group::Membership& membership, memberships.get()) {
- futures.push_back(group->data(membership));
+ futures.push_back(group.data(membership));
}
process::collect(futures, process::Timeout::in(Seconds(5)))