You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/21 22:28:31 UTC

[5/8] mesos git commit: Moved retry logic from RecoverProtocolProcess to RecoverProcess.

Moved retry logic from RecoverProtocolProcess to RecoverProcess.

Review: https://reviews.apache.org/r/62761/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9c25771d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9c25771d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9c25771d

Branch: refs/heads/master
Commit: 9c25771dcd1ed8dcc7696c0ceaad1d5f08033f88
Parents: 826277d
Author: Ilya Pronin <ip...@twopensource.com>
Authored: Thu Dec 21 12:49:53 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Dec 21 12:49:53 2017 -0800

----------------------------------------------------------------------
 src/log/recover.cpp     | 86 ++++++++++++++++++++++++--------------------
 src/log/recover.hpp     |  2 +-
 src/tests/log_tests.cpp | 50 ++++++++++++++++++++++++++
 3 files changed, 99 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9c25771d/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
index 8dab577..2167d41 100644
--- a/src/log/recover.cpp
+++ b/src/log/recover.cpp
@@ -91,7 +91,7 @@ public:
       timeout(_timeout),
       terminating(false) {}
 
-  Future<RecoverResponse> future() { return promise.future(); }
+  Future<Option<RecoverResponse>> future() { return promise.future(); }
 
 protected:
   virtual void initialize()
@@ -329,20 +329,8 @@ private:
     } else if (future.isFailed()) {
       promise.fail(future.failure());
       terminate(self());
-    } else if (future.get().isNone()) {
-      // Re-run the protocol. We add a random delay before each retry
-      // because we do not want to saturate the network/disk IO in
-      // some cases. The delay is chosen randomly to reduce the
-      // likelihood of conflicts (i.e., a replica receives a recover
-      // request while it is changing its status).
-      static const Duration T = Milliseconds(500);
-      Duration d = T * (1.0 + (double) os::random() / RAND_MAX);
-      VLOG(2) << "Didn't receive enough responses for recovery, retrying "
-              << "in " << stringify(d);
-
-      delay(d, self(), &Self::start);
     } else {
-      promise.set(future.get().get());
+      promise.set(future.get());
       terminate(self());
     }
   }
@@ -360,11 +348,11 @@ private:
   Future<Option<RecoverResponse>> chain;
   bool terminating;
 
-  process::Promise<RecoverResponse> promise;
+  process::Promise<Option<RecoverResponse>> promise;
 };
 
 
-Future<RecoverResponse> runRecoverProtocol(
+Future<Option<RecoverResponse>> runRecoverProtocol(
     size_t quorum,
     const Shared<Network>& network,
     const Metadata::Status& status,
@@ -379,7 +367,7 @@ Future<RecoverResponse> runRecoverProtocol(
         autoInitialize,
         timeout);
 
-  Future<RecoverResponse> future = process->future();
+  Future<Option<RecoverResponse>> future = process->future();
   spawn(process, true);
   return future;
 }
@@ -451,12 +439,7 @@ protected:
     // Register a callback to handle user initiated discard.
     promise.future().onDiscard(defer(self(), &Self::discard));
 
-    // Check the current status of the local replica and decide if
-    // recovery is needed. Recovery is needed only if the local
-    // replica is not in VOTING status.
-    chain = replica->status()
-      .then(defer(self(), &Self::recover, lambda::_1))
-      .onAny(defer(self(), &Self::finished, lambda::_1));
+    start();
   }
 
   virtual void finalize()
@@ -470,22 +453,37 @@ private:
     chain.discard();
   }
 
-  Future<Nothing> recover(const Metadata::Status& status)
+  void start()
+  {
+    // Check the current status of the local replica and decide if
+    // recovery is needed. Recovery is needed only if the local
+    // replica is not in VOTING status.
+    chain = replica->status()
+      .then(defer(self(), &Self::recover, lambda::_1))
+      .onAny(defer(self(), &Self::finished, lambda::_1));
+  }
+
+  Future<bool> recover(const Metadata::Status& status)
   {
     LOG(INFO) << "Replica is in " << status << " status";
 
     if (status == Metadata::VOTING) {
       // No need to do recovery.
-      return Nothing();
+      return true;
     } else {
       return runRecoverProtocol(quorum, network, status, autoInitialize)
         .then(defer(self(), &Self::_recover, lambda::_1));
     }
   }
 
-  Future<Nothing> _recover(const RecoverResponse& result)
+  Future<bool> _recover(const Option<RecoverResponse>& result)
   {
-    switch (result.status()) {
+    if (result.isNone()) {
+      // Re-run the recover protocol.
+      return false;
+    }
+
+    switch (result->status()) {
       case Metadata::STARTING:
         // This is the auto-initialization case. As mentioned above, we
         // use a two-phase protocol to bootstrap. When the control
@@ -504,17 +502,17 @@ private:
         return updateReplicaStatus(Metadata::VOTING);
 
       case Metadata::RECOVERING:
-        CHECK(result.has_begin() && result.has_end());
+        CHECK(result->has_begin() && result->has_end());
 
         return updateReplicaStatus(Metadata::RECOVERING)
-          .then(defer(self(), &Self::catchup, result.begin(), result.end()));
+          .then(defer(self(), &Self::catchup, result->begin(), result->end()));
 
       default:
         return Failure("Unexpected status returned from the recover protocol");
     }
   }
 
-  Future<Nothing> catchup(uint64_t begin, uint64_t end)
+  Future<bool> catchup(uint64_t begin, uint64_t end)
   {
     // We reach here either because the log is empty (uninitialized),
     // or the log is not empty but a previous unfinished catch-up
@@ -561,7 +559,7 @@ private:
       .then(defer(self(), &Self::updateReplicaStatus, Metadata::VOTING));
   }
 
-  Future<Nothing> updateReplicaStatus(const Metadata::Status& status)
+  Future<bool> updateReplicaStatus(const Metadata::Status& status)
   {
     LOG(INFO) << "Updating replica status to " << status;
 
@@ -569,7 +567,7 @@ private:
       .then(defer(self(), &Self::_updateReplicaStatus, lambda::_1, status));
   }
 
-  Future<Nothing> _updateReplicaStatus(
+  Future<bool> _updateReplicaStatus(
       bool updated, const Metadata::Status& status)
   {
     if (!updated) {
@@ -580,24 +578,24 @@ private:
       LOG(INFO) << "Successfully joined the Paxos group";
     }
 
-    return Nothing();
+    return true;
   }
 
-  Future<Nothing> getReplicaOwnership(Shared<Replica> shared)
+  Future<bool> getReplicaOwnership(Shared<Replica> shared)
   {
     // Try to re-gain the ownership of the replica.
     return shared.own()
       .then(defer(self(), &Self::_getReplicaOwnership, lambda::_1));
   }
 
-  Future<Nothing> _getReplicaOwnership(Owned<Replica> owned)
+  Future<bool> _getReplicaOwnership(Owned<Replica> owned)
   {
     replica = owned;
 
-    return Nothing();
+    return true;
   }
 
-  void finished(const Future<Nothing>& future)
+  void finished(const Future<bool>& future)
   {
     if (future.isDiscarded()) {
       promise.discard();
@@ -605,6 +603,16 @@ private:
     } else if (future.isFailed()) {
       promise.fail(future.failure());
       terminate(self());
+    } else if (!future.get()) {
+      // We add a random delay before each retry because we do not
+      // want to saturate the network/disk IO in some cases. The delay
+      // is chosen randomly to reduce the likelyhood of conflicts
+      // (i.e., a replica receives a recover request while it is
+      // changing its status).
+      static const Duration T = Milliseconds(500);
+      Duration d = T * (1.0 + (double) os::random() / RAND_MAX);
+      VLOG(2) << "Retrying recovery in " << stringify(d);
+      delay(d, self(), &Self::start);
     } else {
       promise.set(replica);
       terminate(self());
@@ -616,7 +624,9 @@ private:
   const Shared<Network> network;
   const bool autoInitialize;
 
-  Future<Nothing> chain;
+  // The value in this future speficies if the recovery was
+  // successfull or we need to retry it.
+  Future<bool> chain;
 
   process::Promise<Owned<Replica>> promise;
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/9c25771d/src/log/recover.hpp
----------------------------------------------------------------------
diff --git a/src/log/recover.hpp b/src/log/recover.hpp
index da216e4..ba80535 100644
--- a/src/log/recover.hpp
+++ b/src/log/recover.hpp
@@ -34,7 +34,7 @@ namespace log {
 
 // Runs the recover protocol. We will re-run the recover protocol if
 // it cannot be finished within 'timeout'.
-process::Future<RecoverResponse> runRecoverProtocol(
+process::Future<Option<RecoverResponse>> runRecoverProtocol(
     size_t quorum,
     const process::Shared<Network>& network,
     const Metadata::Status& status,

http://git-wip-us.apache.org/repos/asf/mesos/blob/9c25771d/src/tests/log_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp
index e11be88..e13d054 100644
--- a/src/tests/log_tests.cpp
+++ b/src/tests/log_tests.cpp
@@ -1844,6 +1844,56 @@ TEST_F(RecoverTest, CatchupRetry)
 }
 
 
+TEST_F(RecoverTest, RecoverProtocolRetry)
+{
+  const string path1 = path::join(os::getcwd(), ".log1");
+  initializer.flags.path = path1;
+  ASSERT_SOME(initializer.execute());
+
+  const string path2 = path::join(os::getcwd(), ".log2");
+  const string path3 = path::join(os::getcwd(), ".log3");
+
+  Owned<Replica> replica1(new Replica(path1));
+  Owned<Replica> replica2(new Replica(path2));
+  Owned<Replica> replica3(new Replica(path3));
+
+  set<UPID> pids{replica1->pid(), replica2->pid(), replica3->pid()};
+  Shared<Network> network(new Network(pids));
+
+  Future<Owned<Replica>> recovering = recover(2, replica3, network);
+
+  Clock::pause();
+
+  // Wait for the retry timer to be setup.
+  Clock::settle();
+  ASSERT_TRUE(recovering.isPending());
+
+  // Wait for recover process to retry.
+  Clock::advance(Seconds(10));
+  Clock::settle();
+  ASSERT_TRUE(recovering.isPending());
+
+  // Remove replica 2 from the network to be initialized. It is safe
+  // to have non-const access to shared Network here, because all
+  // Network operations are serialized through a Process.
+  const_cast<Network&>(*network).remove(replica2->pid());
+  replica2.reset();
+
+  initializer.flags.path = path2;
+  ASSERT_SOME(initializer.execute());
+
+  replica2.reset(new Replica(path2));
+  const_cast<Network&>(*network).add(replica2->pid());
+
+  // Wait for recover process to retry again, now with 2 VOTING
+  // replicas. It should successfully finish now.
+  Clock::advance(Seconds(10));
+  Clock::resume();
+
+  AWAIT_READY(recovering);
+}
+
+
 TEST_F(RecoverTest, AutoInitialization)
 {
   const string path1 = os::getcwd() + "/.log1";