You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/25 01:58:31 UTC
[2/2] git commit: Allowed log::catchup to retry to tolerate network
blips.
Allowed log::catchup to retry to tolerate network blips.
In case of a network blip (zookeeper network), log::catchup could get
stuck even if all replicas reemerge later. This is not ideal for some
users such as log::recover.
So I introduced an optional timeout for log::catchup so that the
catchup operation on each position will be retried if timeout happens.
From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/17286
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ba1d67c7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ba1d67c7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ba1d67c7
Branch: refs/heads/master
Commit: ba1d67c7f5ef37ca2e0d6e577c55da4f87918f5a
Parents: 33b3afd
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 24 16:58:04 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jan 24 16:58:04 2014 -0800
----------------------------------------------------------------------
src/log/catchup.cpp | 46 ++++++++++++++++--------
src/log/catchup.hpp | 9 +++--
src/tests/log_tests.cpp | 83 ++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 121 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ba1d67c7/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
index 46127de..69fac3c 100644
--- a/src/log/catchup.cpp
+++ b/src/log/catchup.cpp
@@ -21,6 +21,7 @@
#include <process/collect.hpp>
#include <process/id.hpp>
#include <process/process.hpp>
+#include <process/timer.hpp>
#include <stout/lambda.hpp>
#include <stout/stringify.hpp>
@@ -173,12 +174,14 @@ public:
const Shared<Replica>& _replica,
const Shared<Network>& _network,
uint64_t _proposal,
- const set<uint64_t>& _positions)
+ const set<uint64_t>& _positions,
+ const Duration& _timeout)
: ProcessBase(ID::generate("log-bulk-catch-up")),
quorum(_quorum),
replica(_replica),
network(_network),
positions(_positions),
+ timeout(_timeout),
proposal(_proposal) {}
virtual ~BulkCatchUpProcess() {}
@@ -214,23 +217,33 @@ private:
// Store the future so that we can discard it if the user wants to
// cancel the catch-up operation.
- catching = log::catchup(quorum, replica, network, proposal, *it);
- catching.onAny(defer(self(), &Self::caughtup));
+ catching = log::catchup(quorum, replica, network, proposal, *it)
+ .onDiscarded(defer(self(), &Self::discarded))
+ .onFailed(defer(self(), &Self::failed))
+ .onReady(defer(self(), &Self::succeeded));
+
+ Timer::create(timeout, lambda::bind(&Future<uint64_t>::discard, catching));
}
- void caughtup()
+ void discarded()
{
- // No one can discard the future 'catching' except the 'finalize'.
- CHECK(!catching.isDiscarded());
+ LOG(INFO) << "Unable to catch-up position " << *it
+ << " in " << timeout << ", retrying";
- if (catching.isFailed()) {
- promise.fail(
- "Failed to catch-up position " + stringify(*it) +
- ": " + catching.failure());
- terminate(self());
- return;
- }
+ catchup();
+ }
+ void failed()
+ {
+ promise.fail(
+ "Failed to catch-up position " + stringify(*it) +
+ ": " + catching.failure());
+
+ terminate(self());
+ }
+
+ void succeeded()
+ {
++it;
// The single position catch-up function: 'log::catchup' will
@@ -247,6 +260,7 @@ private:
const Shared<Replica> replica;
const Shared<Network> network;
const set<uint64_t> positions;
+ const Duration timeout;
uint64_t proposal;
set<uint64_t>::iterator it;
@@ -266,7 +280,8 @@ Future<Nothing> catchup(
const Shared<Replica>& replica,
const Shared<Network>& network,
const Option<uint64_t>& proposal,
- const set<uint64_t>& positions)
+ const set<uint64_t>& positions,
+ const Duration& timeout)
{
BulkCatchUpProcess* process =
new BulkCatchUpProcess(
@@ -274,7 +289,8 @@ Future<Nothing> catchup(
replica,
network,
proposal.get(0),
- positions);
+ positions,
+ timeout);
Future<Nothing> future = process->future();
spawn(process, true);
http://git-wip-us.apache.org/repos/asf/mesos/blob/ba1d67c7/src/log/catchup.hpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.hpp b/src/log/catchup.hpp
index a2a1183..c72ed81 100644
--- a/src/log/catchup.hpp
+++ b/src/log/catchup.hpp
@@ -26,6 +26,7 @@
#include <process/future.hpp>
#include <process/shared.hpp>
+#include <stout/duration.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
@@ -40,13 +41,17 @@ namespace log {
// this function can provide a hint on the proposal number that will
// be used for Paxos. This could potentially save us a few Paxos
// rounds. However, if the user has no idea what proposal number to
-// use, he can just use none.
+// use, he can just use none. We also allow the user to specify a
+// timeout for the catch-up operation on each position and retry the
+// operation if timeout happens. This can help us tolerate network
+// blips.
extern process::Future<Nothing> catchup(
size_t quorum,
const process::Shared<Replica>& replica,
const process::Shared<Network>& network,
const Option<uint64_t>& proposal,
- const std::set<uint64_t>& positions);
+ const std::set<uint64_t>& positions,
+ const Duration& timeout = Seconds(10));
} // namespace log {
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/ba1d67c7/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index e493af4..b1fb200 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -39,6 +39,7 @@
#include <stout/path.hpp>
#include <stout/try.hpp>
+#include "log/catchup.hpp"
#include "log/coordinator.hpp"
#include "log/log.hpp"
#include "log/network.hpp"
@@ -1331,6 +1332,88 @@ TEST_F(RecoverTest, RacingCatchup)
}
+TEST_F(RecoverTest, CatchupRetry)
+{
+ const string path1 = os::getcwd() + "/.log1";
+ initializer.flags.path = path1;
+ initializer.execute();
+
+ const string path2 = os::getcwd() + "/.log2";
+ initializer.flags.path = path2;
+ initializer.execute();
+
+ const string path3 = os::getcwd() + "/.log3";
+
+ Shared<Replica> replica1(new Replica(path1));
+ Shared<Replica> replica2(new Replica(path2));
+
+ // Make sure replica2 does not receive learned messages.
+ DROP_MESSAGES(Eq(LearnedMessage().GetTypeName()), _, Eq(replica2->pid()));
+
+ set<UPID> pids;
+ pids.insert(replica1->pid());
+ pids.insert(replica2->pid());
+
+ Shared<Network> network1(new Network(pids));
+
+ Coordinator coord(2, replica1, network1);
+
+ {
+ Future<Option<uint64_t> > electing = coord.elect();
+ AWAIT_READY_FOR(electing, Seconds(10));
+ ASSERT_SOME(electing.get());
+ EXPECT_EQ(0u, electing.get().get());
+ }
+
+ set<uint64_t> positions;
+
+ for (uint64_t position = 1; position <= 10; position++) {
+ Future<uint64_t> appending = coord.append(stringify(position));
+ AWAIT_READY_FOR(appending, Seconds(10));
+ EXPECT_EQ(position, appending.get());
+ positions.insert(position);
+ }
+
+ Shared<Replica> replica3(new Replica(path3));
+
+ pids.insert(replica3->pid());
+
+ Shared<Network> network2(new Network(pids));
+
+ // Drop a promise request to replica1 so that the catch-up process
+ // won't be able to get a quorum of explicit promises. Also, since
+ // learned messages are blocked from being sent replica2, the
+ // catch-up process has to wait for a quorum of explicit promises.
+ // If we don't allow retry, the catch-up process will get stuck at
+ // promise phase even if replica1 reemerges later.
+ DROP_MESSAGE(Eq(PromiseRequest().GetTypeName()), _, Eq(replica1->pid()));
+
+ Future<Nothing> catching =
+ catchup(2, replica3, network2, None(), positions, Seconds(10));
+
+ Clock::pause();
+
+ // Wait for the retry timer in 'catchup' to be setup.
+ Clock::settle();
+
+ // Wait for the proposal number to be bumped.
+ Clock::advance(Seconds(1));
+ Clock::settle();
+
+ // Wait for 'catchup' to retry.
+ Clock::advance(Seconds(10));
+ Clock::settle();
+
+ // Wait for another proposal number bump.
+ Clock::advance(Seconds(1));
+ Clock::settle();
+
+ Clock::resume();
+
+ AWAIT_READY(catching);
+}
+
+
class LogTest : public TemporaryDirectoryTest
{
protected: