You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2021/12/16 08:20:14 UTC

[GitHub] [ratis] codings-dan opened a new pull request #567: RATIS-1473.Implement takeSnapshot in Server

codings-dan opened a new pull request #567:
URL: https://github.com/apache/ratis/pull/567


   ## What changes were proposed in this pull request?
   
   subtask of Support snapshot command: Implement takeSnapshot in Server
   
   ## What is the link to the Apache JIRA
   https://issues.apache.org/jira/browse/RATIS-1473
   
   ## How was this patch tested?
   UT
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] codings-dan commented on pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
codings-dan commented on pull request #567:
URL: https://github.com/apache/ratis/pull/567#issuecomment-995887654


   @szetszwo I implemented the code on the server side and wrote a UT, can you help review the code, thx!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on a change in pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #567:
URL: https://github.com/apache/ratis/pull/567#discussion_r772171343



##########
File path: ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftTakeSnapshotWithGrpc.java
##########
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.RaftTakeSnapshotTest;
+
+public class TestRaftTakeSnapshotWithGrpc extends RaftTakeSnapshotTest {

Review comment:
       Please remove rpc test.  We will add it with the rpc change later.

##########
File path: ratis-test/src/test/java/org/apache/ratis/netty/TestRaftTakeSnapshotWithNetty.java
##########
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.RaftTakeSnapshotTest;
+
+public class TestRaftTakeSnapshotWithNetty extends RaftTakeSnapshotTest {

Review comment:
       Please remove rpc test.  We will add it with the rpc change later.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
##########
@@ -468,6 +472,10 @@ long getLastAppliedIndex() {
     return stateMachineUpdater.getStateMachineLastAppliedIndex();
   }
 
+  StateMachineUpdater getStateMachineUpdater() {
+    return stateMachineUpdater;
+  }

Review comment:
       Remove getStateMachineUpdater() since it is not used.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
##########
@@ -253,7 +260,6 @@ private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Messag
       if (futures.isInitialized()) {
         JavaUtils.allOf(futures.get()).get();
       }
-

Review comment:
       Please revert this unrelated change.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -169,6 +170,7 @@ public long getLastAppliedIndex() {
   private final AtomicLong inProgressInstallSnapshotRequest;
   private final AtomicLong installedSnapshotIndex;
   private final AtomicBoolean isSnapshotNull;
+  private final AtomicBoolean finishSnapshot;

Review comment:
       Remove finishSnapshot and the related methods since finishSnapshot is not useful.

##########
File path: ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftTakeSnapshotWithHadoopRpc.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.RaftTakeSnapshotTest;
+
+public class TestRaftTakeSnapshotWithHadoopRpc extends RaftTakeSnapshotTest {

Review comment:
       Please remove rpc test.  We will add it with the rpc change later.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -963,6 +975,49 @@ void finishTransferLeadership() {
     }
   }
 
+  public RaftClientReply takeSnapshot(SnapshotRequest request) throws IOException {
+    return waitForReply(request, takeSnapshotAsync(request));
+  }

Review comment:
       Remove takeSnapshot(..) since it is not used.

##########
File path: ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
##########
@@ -33,4 +33,4 @@
 
   CompletableFuture<RaftClientReply> transferLeadershipAsync(
       TransferLeadershipRequest request) throws IOException;
-}
\ No newline at end of file
+}

Review comment:
       Please revert this unrelated change.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
##########
@@ -112,7 +112,9 @@ public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception
 
   private MiniRaftCluster cluster;
 
-  public abstract MiniRaftCluster.Factory<?> getFactory();
+  public MiniRaftCluster.Factory<?> getFactory() {
+    return null;
+  }

Review comment:
       Please revert this unrelated change.

##########
File path: ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRaftTakeSnapshotWithSimulatedRpc.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.RaftTakeSnapshotTest;
+
+public class TestRaftTakeSnapshotWithSimulatedRpc extends RaftTakeSnapshotTest{

Review comment:
       Please remove rpc test.  We will add it with the rpc change later.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/RaftTakeSnapshotTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class RaftTakeSnapshotTest extends BaseTest {

Review comment:
       Rename it to SnapshotManagementTest.  We will add other new tests later.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/RaftTakeSnapshotTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class RaftTakeSnapshotTest extends BaseTest {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+    Log4jUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(RaftTakeSnapshotTest.class);
+  private MiniRaftCluster cluster;
+
+  public MiniRaftCluster.Factory<?> getFactory() {
+    return null;

Review comment:
       Return MiniRaftClusterWithSimulatedRpc.FACTORY for now so that we can remove all the subclass.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] codings-dan commented on pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
codings-dan commented on pull request #567:
URL: https://github.com/apache/ratis/pull/567#issuecomment-998401466


   @szetszwo After this patch merge, should we do this next:
   1. Add more unit tests
   2. Add rpc and proto
   3. Add shell command
   Is there anything else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] codings-dan commented on pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
codings-dan commented on pull request #567:
URL: https://github.com/apache/ratis/pull/567#issuecomment-997703133


   @szetszwo Thanks for reviewing the code, I change the code according to the comment you left, PTAL again. In addition, you almost helped refactor the code logic, I benefited a lot, thanks again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] codings-dan commented on pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
codings-dan commented on pull request #567:
URL: https://github.com/apache/ratis/pull/567#issuecomment-997762296


   The ci test seems to be unstable, and the error report has nothing to do with this code change, PTAL, thx!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo merged pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #567:
URL: https://github.com/apache/ratis/pull/567


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on a change in pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #567:
URL: https://github.com/apache/ratis/pull/567#discussion_r772263509



##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class SnapshotManagementTest extends BaseTest {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+    Log4jUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotManagementTest.class);
+  private MiniRaftCluster cluster;

Review comment:
       Let's declare the class as
   ```
   public abstract class SnapshotManagementTest<CLUSTER extends MiniRaftCluster>
       extends BaseTest
       implements MiniRaftCluster.Factory.Get<CLUSTER> {
   ```
   Then, it will take care the cluster start and shutdown.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class SnapshotManagementTest extends BaseTest {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+    Log4jUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotManagementTest.class);
+  private MiniRaftCluster cluster;
+
+  public MiniRaftCluster.Factory<?> getFactory() {
+    return MiniRaftClusterWithSimulatedRpc.FACTORY;
+  }
+
+  @Test
+  public void testTakeSnapshot() throws Exception {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+          SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, false);
+    cluster = getFactory().newCluster(1,prop);
+    cluster.start();
+
+    int i = 0;
+    try {
+      final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = leader.getId();
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        //todo(yaolong liu) : make 5 to be a configurable value
+        for (; i < 5; i++) {
+          RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+        final SnapshotRequest r = new SnapshotRequest(client.getId(), leaderId, cluster.getGroupId(),
+              CallId.getAndIncrement(), 3000);
+        RaftServerTestUtil.takeSnapshotAsync(leader, r);
+      }
+
+      // wait for the snapshot to be done
+      long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);

Review comment:
       We should get the log index from the reply.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class SnapshotManagementTest extends BaseTest {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+    Log4jUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotManagementTest.class);
+  private MiniRaftCluster cluster;
+
+  public MiniRaftCluster.Factory<?> getFactory() {
+    return MiniRaftClusterWithSimulatedRpc.FACTORY;
+  }
+
+  @Test
+  public void testTakeSnapshot() throws Exception {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+          SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, false);
+    cluster = getFactory().newCluster(1,prop);
+    cluster.start();
+
+    int i = 0;
+    try {
+      final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = leader.getId();
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        //todo(yaolong liu) : make 5 to be a configurable value
+        for (; i < 5; i++) {
+          RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+        final SnapshotRequest r = new SnapshotRequest(client.getId(), leaderId, cluster.getGroupId(),
+              CallId.getAndIncrement(), 3000);
+        RaftServerTestUtil.takeSnapshotAsync(leader, r);
+      }
+
+      // wait for the snapshot to be done
+      long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);
+
+      final List<File> snapshotFiles = LongStream.range(0, nextIndex)
+            .mapToObj(j ->
+                 SimpleStateMachine4Testing
+                        .get(cluster.getLeader())
+                        .getStateMachineStorage()
+                        .getSnapshotFile(cluster.getLeader().getInfo().getCurrentTerm(), j))
+            .collect(Collectors.toList());
+      JavaUtils.attemptRepeatedly(() -> {
+        Assert.assertTrue(snapshotFiles.stream().anyMatch(File::exists));
+        return null;
+      }, 100, ONE_SECOND, "snapshotFile.exist", LOG);

Review comment:
       Once we have the snapshot index, we can check only the expected file.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class SnapshotManagementTest extends BaseTest {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+    Log4jUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotManagementTest.class);
+  private MiniRaftCluster cluster;
+
+  public MiniRaftCluster.Factory<?> getFactory() {
+    return MiniRaftClusterWithSimulatedRpc.FACTORY;
+  }
+
+  @Test
+  public void testTakeSnapshot() throws Exception {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+          SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, false);
+    cluster = getFactory().newCluster(1,prop);
+    cluster.start();
+
+    int i = 0;
+    try {
+      final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = leader.getId();
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        //todo(yaolong liu) : make 5 to be a configurable value
+        for (; i < 5; i++) {
+          RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+        final SnapshotRequest r = new SnapshotRequest(client.getId(), leaderId, cluster.getGroupId(),
+              CallId.getAndIncrement(), 3000);
+        RaftServerTestUtil.takeSnapshotAsync(leader, r);

Review comment:
       We should get the future and wait for it.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class SnapshotManagementTest extends BaseTest {

Review comment:
       Since the class is abstract, we actually need to add a subclass TestSnapshotManagementWithSimulatedRpc.  Otherwise, the test won't be executed.  Sorry that I asked you to remove SimulatedRpc subclass before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] codings-dan commented on a change in pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
codings-dan commented on a change in pull request #567:
URL: https://github.com/apache/ratis/pull/567#discussion_r772304054



##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/SnapshotManagementTest.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.rpc.CallId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Log4jUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+public abstract class SnapshotManagementTest extends BaseTest {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+    Log4jUtils.setLogLevel(RaftLog.LOG, Level.INFO);
+    Log4jUtils.setLogLevel(RaftClient.LOG, Level.INFO);
+  }
+
+  static final Logger LOG = LoggerFactory.getLogger(SnapshotManagementTest.class);
+  private MiniRaftCluster cluster;
+
+  public MiniRaftCluster.Factory<?> getFactory() {
+    return MiniRaftClusterWithSimulatedRpc.FACTORY;
+  }
+
+  @Test
+  public void testTakeSnapshot() throws Exception {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+          SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, false);
+    cluster = getFactory().newCluster(1,prop);
+    cluster.start();
+
+    int i = 0;
+    try {
+      final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = leader.getId();
+      try(final RaftClient client = cluster.createClient(leaderId)) {
+        //todo(yaolong liu) : make 5 to be a configurable value
+        for (; i < 5; i++) {
+          RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
+        }
+        final SnapshotRequest r = new SnapshotRequest(client.getId(), leaderId, cluster.getGroupId(),
+              CallId.getAndIncrement(), 3000);
+        RaftServerTestUtil.takeSnapshotAsync(leader, r);
+      }
+
+      // wait for the snapshot to be done
+      long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
+      LOG.info("nextIndex = {}", nextIndex);
+
+      final List<File> snapshotFiles = LongStream.range(0, nextIndex)
+            .mapToObj(j ->
+                 SimpleStateMachine4Testing
+                        .get(cluster.getLeader())
+                        .getStateMachineStorage()
+                        .getSnapshotFile(cluster.getLeader().getInfo().getCurrentTerm(), j))
+            .collect(Collectors.toList());
+      JavaUtils.attemptRepeatedly(() -> {
+        Assert.assertTrue(snapshotFiles.stream().anyMatch(File::exists));
+        return null;
+      }, 100, ONE_SECOND, "snapshotFile.exist", LOG);

Review comment:
       Good change, unit test is more accurate




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #567:
URL: https://github.com/apache/ratis/pull/567#issuecomment-998446494


   The tasks look good.  We should also add some unit tests for the shell commands.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] codings-dan edited a comment on pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
codings-dan edited a comment on pull request #567:
URL: https://github.com/apache/ratis/pull/567#issuecomment-998401466


   @szetszwo After this patch merge, should we do this next:
                      1. Add more unit tests
                      2. Add rpc and proto
                      3. Add shell command
                     Is there anything else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on a change in pull request #567: RATIS-1473.Implement takeSnapshot in Server

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #567:
URL: https://github.com/apache/ratis/pull/567#discussion_r770457594



##########
File path: ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
##########
@@ -33,4 +33,7 @@
 
   CompletableFuture<RaftClientReply> transferLeadershipAsync(
       TransferLeadershipRequest request) throws IOException;
-}
\ No newline at end of file
+
+  CompletableFuture<RaftClientReply> takeSnapshotAsync(
+      SnapshotRequest request) throws IOException;
+}

Review comment:
       Please do not change the protocol yet.  Let's discuss how to change it.  We may want to add a snapshotManagementAsync(..) instead of individual snapshot methods.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/Snapshot.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Snapshot {

Review comment:
       Let's call it SnapshotRequestHandler.

##########
File path: ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
##########
@@ -133,6 +134,37 @@ public void tearDown() {
     }
   }
 
+  @Test
+  public void testTakeSnapshot() throws Exception {

Review comment:
       Let's create a new test class since there are already many tests here.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
##########
@@ -305,7 +309,23 @@ private boolean shouldTakeSnapshot() {
     } else if (shouldStop()) {
       return getLastAppliedIndex() - snapshotIndex.get() > 0;
     }
-    return state == State.RUNNING && getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold;
+    return state == State.RUNNING
+        && (takeSnapshot || getLastAppliedIndex() - snapshotIndex.get() >= autoSnapshotThreshold);
+  }
+
+  public void enableSnapshot() {
+    if(!takeSnapshot) {
+      takeSnapshot = true;
+      if(shouldTakeSnapshot()) {
+        takeSnapshot();
+      }
+    }
+  }

Review comment:
       This method may race with the updater thread.  The idea is to trigger updater thread to take a snapshot but not using another thread.
   

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -963,6 +968,51 @@ void finishTransferLeadership() {
     }
   }
 
+  public RaftClientReply takeSnapshot(SnapshotRequest request) throws IOException {
+    return waitForReply(request, takeSnapshotAsync(request));
+  }
+
+  public CompletableFuture<RaftClientReply> takeSnapshotAsync(SnapshotRequest request) throws IOException {
+    LOG.info("{}: receive snapshot", getMemberId());
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    SnapshotInfo latest = stateMachine.getLatestSnapshot();
+    long lastSnapshotIndex = -1;
+    if (latest != null) {
+      lastSnapshotIndex = latest.getIndex();
+    }
+    long minGapValue = 5;
+
+    synchronized (this) {
+      long installSnapshot = inProgressInstallSnapshotRequest.get();
+      // check snapshot install/load
+      if (installSnapshot != 0) {
+        String msg = String.format("{}: Failed do snapshot as snapshot ({}) installation is in progress",
+            getMemberId(), installSnapshot);
+        LOG.warn(msg);
+        return CompletableFuture.completedFuture(newExceptionReply(request,new RaftException(msg)));
+      }
+      //TODO(liuyaolong): make the min gap configurable, or get the gap value from shell command
+      if (state.getLastAppliedIndex() - lastSnapshotIndex < minGapValue) {
+        String msg = String.format("{}: Failed do snapshot as the gap between the applied index and last"
+            + " snapshot index is less than {}", getMemberId(), minGapValue);
+        LOG.warn(msg);
+        return CompletableFuture.completedFuture(newExceptionReply(request,new RaftException(msg)));
+      }
+      setFinishSnapshot(false);
+      state.getStateMachineUpdater().enableSnapshot();

Review comment:
       This is not async.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/Snapshot.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.SnapshotRequest;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Snapshot {
+  public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class);
+
+  class PendingRequest {
+    private final SnapshotRequest request;
+    private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+    PendingRequest(SnapshotRequest request) {
+      this.request = request;
+    }
+
+    SnapshotRequest getRequest() {
+      return request;
+    }
+
+    CompletableFuture<RaftClientReply> getReplyFuture() {
+      return replyFuture;
+    }
+
+    void complete(boolean timeout) {
+      if (replyFuture.isDone()) {
+        return;
+      }
+      if (server.getFinishSnapshot()) {
+        LOG.info("Successfully take snapshot on server {}",server.getRaftServer().getId());
+        replyFuture.complete(server.newSuccessReply(request));
+      } else if(timeout) {
+        String msg = ": Failed to take snapshot ON " + request.getServerId()
+              + " (timed out " + request.getTimeoutMs() + "ms)";
+        final RaftException ex = new RaftException(msg);
+        replyFuture.complete(server.newExceptionReply(request, ex));
+      }
+    }
+
+    @Override
+    public String toString() {
+      return request.toString();
+    }
+  }
+
+  private final RaftServerImpl server;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
+
+  Snapshot(RaftServerImpl server) {
+    this.server = server;
+  }
+
+  boolean isSteppingDown() {
+    return pending.get() != null;
+  }
+
+  CompletableFuture<RaftClientReply> start(SnapshotRequest request) {
+    final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
+    final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
+    if (previous != null) {
+      final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+      previous.getReplyFuture().whenComplete((r, e) -> {
+        if (e != null) {
+          replyFuture.completeExceptionally(e);
+        } else {
+          replyFuture.complete(r.isSuccess()? server.newSuccessReply(request)
+                : server.newExceptionReply(request, r.getException()));
+        }
+      });
+      return replyFuture;

Review comment:
       Just return previous.getReplyFuture().




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org