You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/21 22:28:31 UTC
[5/8] mesos git commit: Moved retry logic from RecoverProtocolProcess
to RecoverProcess.
Moved retry logic from RecoverProtocolProcess to RecoverProcess.
Review: https://reviews.apache.org/r/62761/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9c25771d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9c25771d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9c25771d
Branch: refs/heads/master
Commit: 9c25771dcd1ed8dcc7696c0ceaad1d5f08033f88
Parents: 826277d
Author: Ilya Pronin <ip...@twopensource.com>
Authored: Thu Dec 21 12:49:53 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 21 12:49:53 2017 -0800
----------------------------------------------------------------------
src/log/recover.cpp | 86 ++++++++++++++++++++++++--------------------
src/log/recover.hpp | 2 +-
src/tests/log_tests.cpp | 50 ++++++++++++++++++++++++++
3 files changed, 99 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9c25771d/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
index 8dab577..2167d41 100644
--- a/src/log/recover.cpp
+++ b/src/log/recover.cpp
@@ -91,7 +91,7 @@ public:
timeout(_timeout),
terminating(false) {}
- Future<RecoverResponse> future() { return promise.future(); }
+ Future<Option<RecoverResponse>> future() { return promise.future(); }
protected:
virtual void initialize()
@@ -329,20 +329,8 @@ private:
} else if (future.isFailed()) {
promise.fail(future.failure());
terminate(self());
- } else if (future.get().isNone()) {
- // Re-run the protocol. We add a random delay before each retry
- // because we do not want to saturate the network/disk IO in
- // some cases. The delay is chosen randomly to reduce the
- // likelihood of conflicts (i.e., a replica receives a recover
- // request while it is changing its status).
- static const Duration T = Milliseconds(500);
- Duration d = T * (1.0 + (double) os::random() / RAND_MAX);
- VLOG(2) << "Didn't receive enough responses for recovery, retrying "
- << "in " << stringify(d);
-
- delay(d, self(), &Self::start);
} else {
- promise.set(future.get().get());
+ promise.set(future.get());
terminate(self());
}
}
@@ -360,11 +348,11 @@ private:
Future<Option<RecoverResponse>> chain;
bool terminating;
- process::Promise<RecoverResponse> promise;
+ process::Promise<Option<RecoverResponse>> promise;
};
-Future<RecoverResponse> runRecoverProtocol(
+Future<Option<RecoverResponse>> runRecoverProtocol(
size_t quorum,
const Shared<Network>& network,
const Metadata::Status& status,
@@ -379,7 +367,7 @@ Future<RecoverResponse> runRecoverProtocol(
autoInitialize,
timeout);
- Future<RecoverResponse> future = process->future();
+ Future<Option<RecoverResponse>> future = process->future();
spawn(process, true);
return future;
}
@@ -451,12 +439,7 @@ protected:
// Register a callback to handle user initiated discard.
promise.future().onDiscard(defer(self(), &Self::discard));
- // Check the current status of the local replica and decide if
- // recovery is needed. Recovery is needed only if the local
- // replica is not in VOTING status.
- chain = replica->status()
- .then(defer(self(), &Self::recover, lambda::_1))
- .onAny(defer(self(), &Self::finished, lambda::_1));
+ start();
}
virtual void finalize()
@@ -470,22 +453,37 @@ private:
chain.discard();
}
- Future<Nothing> recover(const Metadata::Status& status)
+ void start()
+ {
+ // Check the current status of the local replica and decide if
+ // recovery is needed. Recovery is needed only if the local
+ // replica is not in VOTING status.
+ chain = replica->status()
+ .then(defer(self(), &Self::recover, lambda::_1))
+ .onAny(defer(self(), &Self::finished, lambda::_1));
+ }
+
+ Future<bool> recover(const Metadata::Status& status)
{
LOG(INFO) << "Replica is in " << status << " status";
if (status == Metadata::VOTING) {
// No need to do recovery.
- return Nothing();
+ return true;
} else {
return runRecoverProtocol(quorum, network, status, autoInitialize)
.then(defer(self(), &Self::_recover, lambda::_1));
}
}
- Future<Nothing> _recover(const RecoverResponse& result)
+ Future<bool> _recover(const Option<RecoverResponse>& result)
{
- switch (result.status()) {
+ if (result.isNone()) {
+ // Re-run the recover protocol.
+ return false;
+ }
+
+ switch (result->status()) {
case Metadata::STARTING:
// This is the auto-initialization case. As mentioned above, we
// use a two-phase protocol to bootstrap. When the control
@@ -504,17 +502,17 @@ private:
return updateReplicaStatus(Metadata::VOTING);
case Metadata::RECOVERING:
- CHECK(result.has_begin() && result.has_end());
+ CHECK(result->has_begin() && result->has_end());
return updateReplicaStatus(Metadata::RECOVERING)
- .then(defer(self(), &Self::catchup, result.begin(), result.end()));
+ .then(defer(self(), &Self::catchup, result->begin(), result->end()));
default:
return Failure("Unexpected status returned from the recover protocol");
}
}
- Future<Nothing> catchup(uint64_t begin, uint64_t end)
+ Future<bool> catchup(uint64_t begin, uint64_t end)
{
// We reach here either because the log is empty (uninitialized),
// or the log is not empty but a previous unfinished catch-up
@@ -561,7 +559,7 @@ private:
.then(defer(self(), &Self::updateReplicaStatus, Metadata::VOTING));
}
- Future<Nothing> updateReplicaStatus(const Metadata::Status& status)
+ Future<bool> updateReplicaStatus(const Metadata::Status& status)
{
LOG(INFO) << "Updating replica status to " << status;
@@ -569,7 +567,7 @@ private:
.then(defer(self(), &Self::_updateReplicaStatus, lambda::_1, status));
}
- Future<Nothing> _updateReplicaStatus(
+ Future<bool> _updateReplicaStatus(
bool updated, const Metadata::Status& status)
{
if (!updated) {
@@ -580,24 +578,24 @@ private:
LOG(INFO) << "Successfully joined the Paxos group";
}
- return Nothing();
+ return true;
}
- Future<Nothing> getReplicaOwnership(Shared<Replica> shared)
+ Future<bool> getReplicaOwnership(Shared<Replica> shared)
{
// Try to re-gain the ownership of the replica.
return shared.own()
.then(defer(self(), &Self::_getReplicaOwnership, lambda::_1));
}
- Future<Nothing> _getReplicaOwnership(Owned<Replica> owned)
+ Future<bool> _getReplicaOwnership(Owned<Replica> owned)
{
replica = owned;
- return Nothing();
+ return true;
}
- void finished(const Future<Nothing>& future)
+ void finished(const Future<bool>& future)
{
if (future.isDiscarded()) {
promise.discard();
@@ -605,6 +603,16 @@ private:
} else if (future.isFailed()) {
promise.fail(future.failure());
terminate(self());
+ } else if (!future.get()) {
+ // We add a random delay before each retry because we do not
+ // want to saturate the network/disk IO in some cases. The delay
+ // is chosen randomly to reduce the likelyhood of conflicts
+ // (i.e., a replica receives a recover request while it is
+ // changing its status).
+ static const Duration T = Milliseconds(500);
+ Duration d = T * (1.0 + (double) os::random() / RAND_MAX);
+ VLOG(2) << "Retrying recovery in " << stringify(d);
+ delay(d, self(), &Self::start);
} else {
promise.set(replica);
terminate(self());
@@ -616,7 +624,9 @@ private:
const Shared<Network> network;
const bool autoInitialize;
- Future<Nothing> chain;
+ // The value in this future speficies if the recovery was
+ // successfull or we need to retry it.
+ Future<bool> chain;
process::Promise<Owned<Replica>> promise;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/9c25771d/src/log/recover.hpp
----------------------------------------------------------------------
diff --git a/src/log/recover.hpp b/src/log/recover.hpp
index da216e4..ba80535 100644
--- a/src/log/recover.hpp
+++ b/src/log/recover.hpp
@@ -34,7 +34,7 @@ namespace log {
// Runs the recover protocol. We will re-run the recover protocol if
// it cannot be finished within 'timeout'.
-process::Future<RecoverResponse> runRecoverProtocol(
+process::Future<Option<RecoverResponse>> runRecoverProtocol(
size_t quorum,
const process::Shared<Network>& network,
const Metadata::Status& status,
http://git-wip-us.apache.org/repos/asf/mesos/blob/9c25771d/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index e11be88..e13d054 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -1844,6 +1844,56 @@ TEST_F(RecoverTest, CatchupRetry)
}
+TEST_F(RecoverTest, RecoverProtocolRetry)
+{
+ const string path1 = path::join(os::getcwd(), ".log1");
+ initializer.flags.path = path1;
+ ASSERT_SOME(initializer.execute());
+
+ const string path2 = path::join(os::getcwd(), ".log2");
+ const string path3 = path::join(os::getcwd(), ".log3");
+
+ Owned<Replica> replica1(new Replica(path1));
+ Owned<Replica> replica2(new Replica(path2));
+ Owned<Replica> replica3(new Replica(path3));
+
+ set<UPID> pids{replica1->pid(), replica2->pid(), replica3->pid()};
+ Shared<Network> network(new Network(pids));
+
+ Future<Owned<Replica>> recovering = recover(2, replica3, network);
+
+ Clock::pause();
+
+ // Wait for the retry timer to be setup.
+ Clock::settle();
+ ASSERT_TRUE(recovering.isPending());
+
+ // Wait for recover process to retry.
+ Clock::advance(Seconds(10));
+ Clock::settle();
+ ASSERT_TRUE(recovering.isPending());
+
+ // Remove replica 2 from the network to be initialized. It is safe
+ // to have non-const access to shared Network here, because all
+ // Network operations are serialized through a Process.
+ const_cast<Network&>(*network).remove(replica2->pid());
+ replica2.reset();
+
+ initializer.flags.path = path2;
+ ASSERT_SOME(initializer.execute());
+
+ replica2.reset(new Replica(path2));
+ const_cast<Network&>(*network).add(replica2->pid());
+
+ // Wait for recover process to retry again, now with 2 VOTING
+ // replicas. It should successfully finish now.
+ Clock::advance(Seconds(10));
+ Clock::resume();
+
+ AWAIT_READY(recovering);
+}
+
+
TEST_F(RecoverTest, AutoInitialization)
{
const string path1 = os::getcwd() + "/.log1";