You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2021/12/16 15:46:17 UTC

[GitHub] [ratis] szetszwo commented on a change in pull request #567: RATIS-1473.Implement takeSnapshot in Server

szetszwo commented on a change in pull request #567:
URL: https://github.com/apache/ratis/pull/567#discussion_r770457594



##########
File path: ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
##########
@@ -33,4 +33,7 @@
 
   CompletableFuture<RaftClientReply> transferLeadershipAsync(
       TransferLeadershipRequest request) throws IOException;
-}
\ No newline at end of file
+
+  CompletableFuture<RaftClientReply> takeSnapshotAsync(
+      SnapshotRequest request) throws IOException;
+}

Review comment:
       Please do not change the protocol yet.  Let's discuss how to change it.  We may want to add a snapshotManagementAsync(..) instead of individual snapshot methods.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/Snapshot.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Snapshot {

Review comment:
       Let's call it SnapshotRequestHandler.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
##########
@@ -133,6 +134,37 @@ public void tearDown() {
     }
   }
 
+  @Test
+  public void testTakeSnapshot() throws Exception {

Review comment:
       Let's create a new test class since there are already many tests here.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
##########
@@ -305,7 +309,23 @@ private boolean shouldTakeSnapshot() {
     } else if (shouldStop()) {
       return getLastAppliedIndex() - snapshotIndex.get() > 0;
     }
-    return state == State.RUNNING && getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold;
+    return state == State.RUNNING
+        && (takeSnapshot || getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold);
+  }
+
+  public void enableSnapshot() {
+    if(!takeSnapshot) {
+      takeSnapshot = true;
+      if(shouldTakeSnapshot()) {
+        takeSnapshot();
+      }
+    }
+  }

Review comment:
       This method may race with the updater thread.  The idea is to trigger updater thread to take a snapshot but not using another thread.
   

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -963,6 +968,51 @@ void finishTransferLeadership() {
     }
   }
 
+  public RaftClientReply takeSnapshot(SnapshotRequest request) throws IOException {
+    return waitForReply(request, takeSnapshotAsync(request));
+  }
+
+  public CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotRequest request) throws IOException {
+    LOG.info("{}: receive snapshot", getMemberId());
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    SnapshotInfo latest = stateMachine.getLatestSnapshot();
+    long lastSnapshotIndex = -1;
+    if (latest != null) {
+      lastSnapshotIndex = latest.getIndex();
+    }
+    long minGapValue = 5;
+
+    synchronized (this) {
+      long installSnapshot = inProgressInstallSnapshotRequest.get();
+      // check snapshot install/load
+      if (installSnapshot != 0) {
+        String msg = String.format("{}: Failed do snapshot as snapshot ({}) installation is in progress",
+            getMemberId(), installSnapshot);
+        LOG.warn(msg);
+        return CompletableFuture.completedFuture(newExceptionReply(request,new RaftException(msg)));
+      }
+      //TODO(liuyaolong): make the min gap configurable, or get the gap value from shell command
+      if (state.getLastAppliedIndex() - lastSnapshotIndex < minGapValue) {
+        String msg = String.format("{}: Failed do snapshot as the gap between the applied index and last"
+            + " snapshot index is less than {}", getMemberId(), minGapValue);
+        LOG.warn(msg);
+        return CompletableFuture.completedFuture(newExceptionReply(request,new RaftException(msg)));
+      }
+      setFinishSnapshot(false);
+      state.getStateMachineUpdater().enableSnapshot();

Review comment:
       This is not async.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/Snapshot.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Snapshot {
+  public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class);
+
+  class PendingRequest {
+    private final SnapshotRequest request;
+    private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+    PendingRequest(SnapshotRequest request) {
+      this.request = request;
+    }
+
+    SnapshotRequest getRequest() {
+      return request;
+    }
+
+    CompletableFuture<RaftClientReply> getReplyFuture() {
+      return replyFuture;
+    }
+
+    void complete(boolean timeout) {
+      if (replyFuture.isDone()) {
+        return;
+      }
+      if (server.getFinishSnapshot()) {
+        LOG.info("Successfully take snapshot on server {}",server.getRaftServer().getId());
+        replyFuture.complete(server.newSuccessReply(request));
+      } else if(timeout) {
+        String msg = ": Failed to take snapshot ON " + request.getServerId()
+              + " (timed out " + request.getTimeoutMs() + "ms)";
+        final RaftException ex = new RaftException(msg);
+        replyFuture.complete(server.newExceptionReply(request, ex));
+      }
+    }
+
+    @Override
+    public String toString() {
+      return request.toString();
+    }
+  }
+
+  private final RaftServerImpl server;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
+
+  Snapshot(RaftServerImpl server) {
+    this.server = server;
+  }
+
+  boolean isSteppingDown() {
+    return pending.get() != null;
+  }
+
+  CompletableFuture<RaftClientReply> start(SnapshotRequest request) {
+    final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
+    final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
+    if (previous != null) {
+      final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+      previous.getReplyFuture().whenComplete((r, e) -> {
+        if (e != null) {
+          replyFuture.completeExceptionally(e);
+        } else {
+          replyFuture.complete(r.isSuccess()? server.newSuccessReply(request)
+                : server.newExceptionReply(request, r.getException()));
+        }
+      });
+      return replyFuture;

Review comment:
       Just return previous.getReplyFuture().




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org