You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/24 08:45:21 UTC

[incubator-ratis] branch master updated: RATIS-987. Infinite install snapshot (#137)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6add587  RATIS-987. Infinite install snapshot (#137)
6add587 is described below

commit 6add5871b6de8064d5340a5b0fcdf5dac12a6dd4
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Jun 24 16:45:14 2020 +0800

    RATIS-987. Infinite install snapshot (#137)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 39 ++++++++++++----------
 1 file changed, 22 insertions(+), 17 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c4f9ec2..e32e071 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1251,24 +1251,29 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
             getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
 
-        stateMachine.notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
-            .whenComplete((reply, exception) -> {
-              if (exception != null) {
-                LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
-                    getMemberId(), exception.getMessage());
+        try {
+          stateMachine.notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
+              .whenComplete((reply, exception) -> {
+                if (exception != null) {
+                  LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
+                      getMemberId(), exception.getMessage());
+                  inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
+                  return;
+                }
+
+                if (reply != null) {
+                  LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.",
+                      getMemberId(), reply.getIndex());
+                  stateMachine.pause();
+                  state.updateInstalledSnapshotIndex(reply);
+                  state.reloadStateMachine(reply.getIndex());
+                }
                 inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
-                return;
-              }
-
-              if (reply != null) {
-                LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.",
-                    getMemberId(), reply.getIndex());
-                stateMachine.pause();
-                state.updateInstalledSnapshotIndex(reply);
-                state.reloadStateMachine(reply.getIndex());
-              }
-              inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
-            });
+              });
+        } catch (Throwable t) {
+          inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex, null);
+          throw t;
+        }
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: Snapshot Installation Request received and is in progress", getMemberId());