You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/01/25 01:58:31 UTC

[2/2] git commit: Allowed log::catchup to retry to tolerate network blips.

Allowed log::catchup to retry to tolerate network blips.

In case of a network blip (zookeeper network), log::catchup could get
stuck even if all replicas reemerge later. This is not ideal for some
users such as log::recover.

So I introduced an optional timeout for log::catchup so that the
catchup operation on each position will be retried if timeout happens.

From: Jie Yu <yu...@gmail.com>
Review: https://reviews.apache.org/r/17286


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ba1d67c7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ba1d67c7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ba1d67c7

Branch: refs/heads/master
Commit: ba1d67c7f5ef37ca2e0d6e577c55da4f87918f5a
Parents: 33b3afd
Author: Benjamin Hindman <be...@gmail.com>
Authored: Fri Jan 24 16:58:04 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Fri Jan 24 16:58:04 2014 -0800

----------------------------------------------------------------------
 src/log/catchup.cpp     | 46 ++++++++++++++++--------
 src/log/catchup.hpp     |  9 +++--
 src/tests/log_tests.cpp | 83 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 121 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ba1d67c7/src/log/catchup.cpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.cpp b/src/log/catchup.cpp
index 46127de..69fac3c 100644
--- a/src/log/catchup.cpp
+++ b/src/log/catchup.cpp
@@ -21,6 +21,7 @@
 #include <process/collect.hpp>
 #include <process/id.hpp>
 #include <process/process.hpp>
+#include <process/timer.hpp>
 
 #include <stout/lambda.hpp>
 #include <stout/stringify.hpp>
@@ -173,12 +174,14 @@ public:
       const Shared<Replica>& _replica,
       const Shared<Network>& _network,
       uint64_t _proposal,
-      const set<uint64_t>& _positions)
+      const set<uint64_t>& _positions,
+      const Duration& _timeout)
     : ProcessBase(ID::generate("log-bulk-catch-up")),
       quorum(_quorum),
       replica(_replica),
       network(_network),
       positions(_positions),
+      timeout(_timeout),
       proposal(_proposal) {}
 
   virtual ~BulkCatchUpProcess() {}
@@ -214,23 +217,33 @@ private:
 
     // Store the future so that we can discard it if the user wants to
     // cancel the catch-up operation.
-    catching = log::catchup(quorum, replica, network, proposal, *it);
-    catching.onAny(defer(self(), &Self::caughtup));
+    catching = log::catchup(quorum, replica, network, proposal, *it)
+      .onDiscarded(defer(self(), &Self::discarded))
+      .onFailed(defer(self(), &Self::failed))
+      .onReady(defer(self(), &Self::succeeded));
+
+    Timer::create(timeout, lambda::bind(&Future<uint64_t>::discard, catching));
   }
 
-  void caughtup()
+  void discarded()
   {
-    // No one can discard the future 'catching' except the 'finalize'.
-    CHECK(!catching.isDiscarded());
+    LOG(INFO) << "Unable to catch-up position " << *it
+              << " in " << timeout << ", retrying";
 
-    if (catching.isFailed()) {
-      promise.fail(
-          "Failed to catch-up position " + stringify(*it) +
-          ": " + catching.failure());
-      terminate(self());
-      return;
-    }
+    catchup();
+  }
 
+  void failed()
+  {
+    promise.fail(
+        "Failed to catch-up position " + stringify(*it) +
+        ": " + catching.failure());
+
+    terminate(self());
+  }
+
+  void succeeded()
+  {
     ++it;
 
     // The single position catch-up function: 'log::catchup' will
@@ -247,6 +260,7 @@ private:
   const Shared<Replica> replica;
   const Shared<Network> network;
   const set<uint64_t> positions;
+  const Duration timeout;
 
   uint64_t proposal;
   set<uint64_t>::iterator it;
@@ -266,7 +280,8 @@ Future<Nothing> catchup(
     const Shared<Replica>& replica,
     const Shared<Network>& network,
     const Option<uint64_t>& proposal,
-    const set<uint64_t>& positions)
+    const set<uint64_t>& positions,
+    const Duration& timeout)
 {
   BulkCatchUpProcess* process =
     new BulkCatchUpProcess(
@@ -274,7 +289,8 @@ Future<Nothing> catchup(
         replica,
         network,
         proposal.get(0),
-        positions);
+        positions,
+        timeout);
 
   Future<Nothing> future = process->future();
   spawn(process, true);

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba1d67c7/src/log/catchup.hpp
----------------------------------------------------------------------
diff --git a/src/log/catchup.hpp b/src/log/catchup.hpp
index a2a1183..c72ed81 100644
--- a/src/log/catchup.hpp
+++ b/src/log/catchup.hpp
@@ -26,6 +26,7 @@
 #include <process/future.hpp>
 #include <process/shared.hpp>
 
+#include <stout/duration.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
 
@@ -40,13 +41,17 @@ namespace log {
 // this function can provide a hint on the proposal number that will
 // be used for Paxos. This could potentially save us a few Paxos
 // rounds. However, if the user has no idea what proposal number to
-// use, he can just use none.
+// use, he can just use none. We also allow the user to specify a
+// timeout for the catch-up operation on each position and retry the
+// operation if timeout happens. This can help us tolerate network
+// blips.
 extern process::Future<Nothing> catchup(
     size_t quorum,
     const process::Shared<Replica>& replica,
     const process::Shared<Network>& network,
     const Option<uint64_t>& proposal,
-    const std::set<uint64_t>& positions);
+    const std::set<uint64_t>& positions,
+    const Duration& timeout = Seconds(10));
 
 } // namespace log {
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba1d67c7/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index e493af4..b1fb200 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -39,6 +39,7 @@
 #include <stout/path.hpp>
 #include <stout/try.hpp>
 
+#include "log/catchup.hpp"
 #include "log/coordinator.hpp"
 #include "log/log.hpp"
 #include "log/network.hpp"
@@ -1331,6 +1332,88 @@ TEST_F(RecoverTest, RacingCatchup)
 }
 
 
+TEST_F(RecoverTest, CatchupRetry)
+{
+  const string path1 = os::getcwd() + "/.log1";
+  initializer.flags.path = path1;
+  initializer.execute();
+
+  const string path2 = os::getcwd() + "/.log2";
+  initializer.flags.path = path2;
+  initializer.execute();
+
+  const string path3 = os::getcwd() + "/.log3";
+
+  Shared<Replica> replica1(new Replica(path1));
+  Shared<Replica> replica2(new Replica(path2));
+
+  // Make sure replica2 does not receive learned messages.
+  DROP_MESSAGES(Eq(LearnedMessage().GetTypeName()), _, Eq(replica2->pid()));
+
+  set<UPID> pids;
+  pids.insert(replica1->pid());
+  pids.insert(replica2->pid());
+
+  Shared<Network> network1(new Network(pids));
+
+  Coordinator coord(2, replica1, network1);
+
+  {
+    Future<Option<uint64_t> > electing = coord.elect();
+    AWAIT_READY_FOR(electing, Seconds(10));
+    ASSERT_SOME(electing.get());
+    EXPECT_EQ(0u, electing.get().get());
+  }
+
+  set<uint64_t> positions;
+
+  for (uint64_t position = 1; position <= 10; position++) {
+    Future<uint64_t> appending = coord.append(stringify(position));
+    AWAIT_READY_FOR(appending, Seconds(10));
+    EXPECT_EQ(position, appending.get());
+    positions.insert(position);
+  }
+
+  Shared<Replica> replica3(new Replica(path3));
+
+  pids.insert(replica3->pid());
+
+  Shared<Network> network2(new Network(pids));
+
+  // Drop a promise request to replica1 so that the catch-up process
+  // won't be able to get a quorum of explicit promises. Also, since
+  // learned messages are blocked from being sent replica2, the
+  // catch-up process has to wait for a quorum of explicit promises.
+  // If we don't allow retry, the catch-up process will get stuck at
+  // promise phase even if replica1 reemerges later.
+  DROP_MESSAGE(Eq(PromiseRequest().GetTypeName()), _, Eq(replica1->pid()));
+
+  Future<Nothing> catching =
+    catchup(2, replica3, network2, None(), positions, Seconds(10));
+
+  Clock::pause();
+
+  // Wait for the retry timer in 'catchup' to be setup.
+  Clock::settle();
+
+  // Wait for the proposal number to be bumped.
+  Clock::advance(Seconds(1));
+  Clock::settle();
+
+  // Wait for 'catchup' to retry.
+  Clock::advance(Seconds(10));
+  Clock::settle();
+
+  // Wait for another proposal number bump.
+  Clock::advance(Seconds(1));
+  Clock::settle();
+
+  Clock::resume();
+
+  AWAIT_READY(catching);
+}
+
+
 class LogTest : public TemporaryDirectoryTest
 {
 protected: