You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/08/31 23:18:10 UTC

[GitHub] [incubator-ratis] amaliujia opened a new pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

amaliujia opened a new pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188


   ## What changes were proposed in this pull request?
   Support `pause()` in `RaftServerImpl`, in which firstly it transits state from RUNNING to PAUSING, and then pause ongoing operations (e.g. state machine), and lastly transits state from PAUSING to PAUSED.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-960
   
   ## How was this patch tested?
   
   Unit Tests.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693663407


   @szetszwo 
   
   sounds good! I will change this PR to a local call for now. If in the future we decide to make RPC call, it will very easy to add that API on top of a local call.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-688457676


   Friendly ping to raise attention on this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693637927


   https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185454&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17185454
   By #2 of the comment above, a follower detects itself if it is lagging behind.  If yes, it will pause itself and install snapshot.  So pause() is only a local call.  No?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-690896647


   @szetszwo  Can you take a look please? Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494659436



##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);

Review comment:
       We should use SimpleMessage.create(int numMessages, String prefix) to create batch1 and batch2 with different prefixes.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693647105


   > https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185454&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17185454
   > By # 2 of the comment above, a follower detects itself if it is lagging behind. 
   
   This is correct.
   
   > If yes, it will pause itself and install snapshot. So pause() is only a local call. No?
   
   This is not decided. Right now Leader, follower and application all could make the pause/install snapshot call. For Leader and application, it has to be a RPC call. If it is the follower to take responsibility for catch-up, it should be a local call.
   
   I am leaning to let either leader or application (which needs client change) to handle this process. That's why I am adding this as a RPC call. 
   
   Another reason to keep it as a RPC call is, seems to me that keep the room to allow applications in the future pause a node is ok. E.g. people just pause a node and manually log in to inspect the node's status.
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494480462



##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    writeThread.join();
+
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, messages));
+    RaftLog followerLog = follower.getState().getLog();
+    // follower should contain less messages because it was paused already.
+    Assert.assertTrue(followerLog.getEntries(0, messages.length).length < messages.length);

Review comment:
       This is incorrect since there are entries other than the messages.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,41 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  public boolean pause() throws IOException {
+    // TODO: should pause() be limited on only working for a follower?
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+      lifeCycle.compareAndTransition(PAUSING, PAUSED);
+    }
+    return true;
+  }
+
+  public boolean resume() {
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be resumed?
+      try {
+        stateMachine.reinitialize();
+      } catch (IOException e) {
+        LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+        lifeCycle.compareAndTransition(STARTING, PAUSED);

Review comment:
       Life cycle cannot be transited from STARTING to PAUSED.  It must be transited to EXCEPTION in this case.

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);

Review comment:
       Create two set of messages.  Send one set before pause and then send the other set after pause.  The paused follower should have all messages from the first set but none for the second set.

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {

Review comment:
       Let's move this to a new test since Pause-Resume is not a basic feature.  :)

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    writeThread.join();
+

Review comment:
       Sleep for some time before comparing the log.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693647105


   > https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185454&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17185454
   > By # 2 of the comment above, a follower detects itself if it is lagging behind. 
   
   This is correct.
   
   > If yes, it will pause itself and install snapshot. So pause() is only a local call. No?
   
   This is not decided. Right now Leader, follower and application all could drive pause/install snapshot/resume process. For Leader and application, it has to be a RPC call. If it is the follower to take responsibility for catch-up, it should be a local call.
   
   I am leaning to let either leader or application (which needs client change) to handle this process. That's why I am adding this as a RPC call. 
   
   Another reason to keep it as a RPC call is, seems to me that keep the room to allow applications in the future pause a node is ok. E.g. people just pause a node and manually log in to inspect the node's status.
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494669145



##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);

Review comment:
       Ack

##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch1);
+
+    writeThread.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
+    RaftLog followerLog = follower.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
+
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    SimpleMessage[] batch2 = SimpleMessage.create(100);
+    Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch2);
+
+    writeThread2.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    // paused follower should not have any batch2 message in its raftlog.
+    Assert.assertTrue(RaftTestUtil.logEntriesNotContains(followerLog, batch2));
+  }

Review comment:
       sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-698556805


   @szetszwo  comments addressed. Can you take another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r483156528



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1177,26 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  @Override
+  public PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    // TODO: should pause() be limited on only working for a follower?
+    if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+      return PauseReplyProto.newBuilder().setSuccess(false).build();
+    }
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+    }

Review comment:
       so 
   
   1. Change state to pausing will (or at least should)  prevent, e.g. new AppendEntries.
   2. `synchronized (this)` will guarantee that for those operations which get lock before this pause code block, they should be able to finish.
   
   However my current thought is, there could be still be a race condition, when pause and other operations are competing this lock, and then pause wins, other operations will still be executed after pause, which is wrong.
   
   So the resolution can be adding an extra LifeCycle check after, e.g. AppendEntries, entering `synchronized (this)`, to make sure no other operations happen after a pause. 
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-690896647


   @szetszwo  Can you take a look please? Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684105687


   @amaliujia Thanks for you work.  `follower.pause`  do you mean this pause method should be called by leader ?  When leader should do this ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684092035


   @szetszwo 
   
   can you take a look and give some suggestions?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r493860728



##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,22 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseFollowers() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseFollowers);
+  }
+
+  void runTestPauseFollowers(CLUSTER cluster) throws Exception {

Review comment:
       Sounds good. Have updated this test to also tes resume. Introduced a thread to send messages by a client meanwhile pause/resume a follower to verify that pause/resume works well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693651431


   > This is not decided.
   
   If it is not yet decided, we must wait before changing the code.  Otherwise, we may change it in a wrong way.
   
   Let's just make the change for what is already decided?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] hanishakoneru commented on a change in pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
hanishakoneru commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r488103720



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
##########
@@ -67,4 +69,8 @@ public B setServer(RaftServer raftServer) {
   /** The server role changes from leader to a non-leader role. */
   default void notifyNotLeader(RaftGroupId groupId) {
   }
+
+  default PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    throw new UnsupportedOperationException("pause() is not supported!");

Review comment:
       Nitpick: Can we just mention "pause is not supported". I think it would be better than saying that the specific  function is not supported.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,26 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  @Override
+  public PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    // TODO: should pause() be limited on only working for a follower?
+    if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+      return PauseReplyProto.newBuilder().setSuccess(false).build();

Review comment:
       Do we need the assert check here as lifecycle.compareAndTransition would check anyway before changing. 
   It comes down to whether we want to throw an error if current state is not running or return a false as response.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1177,26 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  @Override
+  public PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    // TODO: should pause() be limited on only working for a follower?
+    if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+      return PauseReplyProto.newBuilder().setSuccess(false).build();
+    }
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+    }

Review comment:
       Agree. Also, the lifecyle transition should happen within synchronized block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494657425



##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch1);
+
+    writeThread.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
+    RaftLog followerLog = follower.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
+
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    SimpleMessage[] batch2 = SimpleMessage.create(100);
+    Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch2);
+
+    writeThread2.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    // paused follower should not have any batch2 message in its raftlog.
+    Assert.assertTrue(RaftTestUtil.logEntriesNotContains(followerLog, batch2));
+  }

Review comment:
       We should call resume() and see if the follower could get the batch2.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684163270


   @hanishakoneru cc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-690896647






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494480462



##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    writeThread.join();
+
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, messages));
+    RaftLog followerLog = follower.getState().getLog();
+    // follower should contain less messages because it was paused already.
+    Assert.assertTrue(followerLog.getEntries(0, messages.length).length < messages.length);

Review comment:
       This is incorrect since there are entries other than the messages.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,41 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  public boolean pause() throws IOException {
+    // TODO: should pause() be limited on only working for a follower?
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+      lifeCycle.compareAndTransition(PAUSING, PAUSED);
+    }
+    return true;
+  }
+
+  public boolean resume() {
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be resumed?
+      try {
+        stateMachine.reinitialize();
+      } catch (IOException e) {
+        LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+        lifeCycle.compareAndTransition(STARTING, PAUSED);

Review comment:
       Life cycle cannot be transited from STARTING to PAUSED.  It must be transited to EXCEPTION in this case.

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);

Review comment:
       Create two set of messages.  Send one set before pause and then send the other set after pause.  The paused follower should have all messages from the first set but none for the second set.

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {

Review comment:
       Let's move this to a new test since Pause-Resume is not a basic feature.  :)

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,46 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    // keep sending messages to the leader.
+    SimpleMessage[] messages = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    writeThread.join();
+

Review comment:
       Sleep for some time before comparing the log.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,41 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  public boolean pause() throws IOException {
+    // TODO: should pause() be limited on only working for a follower?
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+      lifeCycle.compareAndTransition(PAUSING, PAUSED);
+    }
+    return true;
+  }
+
+  public boolean resume() {
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be resumed?
+      try {
+        stateMachine.reinitialize();
+      } catch (IOException e) {
+        LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+        lifeCycle.compareAndTransition(STARTING, PAUSED);

Review comment:
       It should also rethrow the exception.

##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch1);
+
+    writeThread.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
+    RaftLog followerLog = follower.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
+
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    SimpleMessage[] batch2 = SimpleMessage.create(100);
+    Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch2);
+
+    writeThread2.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    // paused follower should not have any batch2 message in its raftlog.
+    Assert.assertTrue(RaftTestUtil.logEntriesNotContains(followerLog, batch2));
+  }

Review comment:
       We should call resume() and see if the follower could get the batch2.

##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);

Review comment:
       We should use SimpleMessage.create(int numMessages, String prefix) to create batch1 and batch2 with different prefixes.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-697993849


   @szetszwo  
   
   Have addressed your comments. Can you take another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684112568


   @runzhiwang `PAUSING` and `PAUSED` states are in the LifeCycle so technically for any role (leader, follower, candidate) can be paused.
   
   However, the current known usage is to pause a lagging follower and install a snapshot to help the follower catch up (e.g. lagging OM in Ozone HA mode or lagging followers on Ozone DataNode). 
   
   That's why I left a comment say `Pause()` might only matter for follower now. 
   
   Regarding who will call the `pause()`, it is not decided yet and I will have a design for it (relevant context: https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185513&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17185513)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693773598


   @szetszwo  
   
   Updated to a local call implementation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684112568


   @runzhiwang `PAUSING` and `PAUSED` states are in the LifeCycle so technically speaking, any role (leader, follower, candidate) can be paused.
   
   However, the current known usage is to pause a lagging follower and install a snapshot to help the follower catch up (e.g. lagging OM in Ozone HA mode or lagging followers on Ozone DataNode). 
   
   That's why I left a comment say `pause()` might only matter for a follower now. 
   
   Regarding who will call the `pause()`, it is not decided yet and I will have a design for it (relevant context: https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185513&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17185513)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r488288904



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,26 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  @Override
+  public PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    // TODO: should pause() be limited on only working for a follower?
+    if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+      return PauseReplyProto.newBuilder().setSuccess(false).build();

Review comment:
       I see. I removed the assert check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo edited a comment on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo edited a comment on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693637927


   https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185454&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17185454
   By # 2 of the comment above, a follower detects itself if it is lagging behind.  If yes, it will pause itself and install snapshot.  So pause() is only a local call.  No?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r493859930



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,23 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  public boolean pause() throws IOException {

Review comment:
       Makes sense. Added resume() as well in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684092182


   Also @runzhiwang would you mind giving some suggestions?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693587023


   @szetszwo 
   
   I am following what was discussed in https://issues.apache.org/jira/browse/RATIS-624. 
   
   I have created 4 JIRA to achieve 4 smaller goals. This PR is the first one. The RPC usage should be addressed by RATIS-1068.
   
   Does my pace to tackle the slow follower catch-up make sense to you?  
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684092182


   Also @runzhiwang would mind giving some suggestions?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-690896647


   @szetszwo  Can you take a look please? Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693647105


   @szetszwo 
   
   > https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185454&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17185454
   > By # 2 of the comment above, a follower detects itself if it is lagging behind. 
   
   This is correct.
   
   > If yes, it will pause itself and install snapshot. So pause() is only a local call. No?
   
   This is not decided. Right now Leader, follower and application all could drive pause/install snapshot/resume process. For Leader and application, it has to be a RPC call. If it is the follower to take responsibility for catch-up, it should be a local call.
   
   I am leaning to let either leader or application (which needs client change) to handle this process. That's why I am adding this as a RPC call. 
   
   Another reason to keep it as a RPC call is, seems to me that keep the room to allow applications in the future pause a node is ok. E.g. people just pause a node and manually log in to inspect the node's status.
   
   
   So do you think it is better to let follower do all the work? In that case follower need a `notifyLeaderToSendInstallSnapshot`.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-693212391


   The new pause() method is only for local use but not a rpc.  Or do you have another plan?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r493415470



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -358,6 +360,10 @@ public boolean isLeader() {
     return role.isLeader();
   }
 
+  public boolean isActive() {
+    return !lifeCycle.getCurrentState().isPausingOrPaused();
+  }

Review comment:
       When the state is CLOSED, isActive() returns true.  It does not sound right.  Let's rename this method to isPausingOrPaused?

##########
File path: ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
##########
@@ -337,6 +337,22 @@ public void testWithLoad() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster -> testWithLoad(10, 300, false, cluster, LOG));
   }
 
+  @Test
+  public void testPauseFollowers() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseFollowers);
+  }
+
+  void runTestPauseFollowers(CLUSTER cluster) throws Exception {

Review comment:
       The test is too simple for the pause() feature.  The test should keep sending requests.  Then call pause() and make sure the request after paused will not be executed.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,23 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  public boolean pause() throws IOException {

Review comment:
       pause() and resume() should be implemented together.  Otherwise, the feature is incomplete. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r483156528



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1177,26 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  @Override
+  public PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    // TODO: should pause() be limited on only working for a follower?
+    if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+      return PauseReplyProto.newBuilder().setSuccess(false).build();
+    }
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+    }

Review comment:
       so 
   
   1. Change state to pausing will (or at least should)  prevent, e.g. new AppendEntries.
   2. `synchronized (this)` will guarantee that for those operations which get lock before this pause code block, they should be able to finish.
   
   However my current thought is, there could be still be a race condition, when pause and other operations are competing this lock, and then pause wins, other operations will still be executed after pause, which is wrong.
   
   So the resolution can be adding an extra LifeCycle check after, e.g. AppendEntriess, entering `synchronized (this)`, to make sure no pause has happened. 
   
   WDYT?

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1177,26 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  @Override
+  public PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    // TODO: should pause() be limited on only working for a follower?
+    if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+      return PauseReplyProto.newBuilder().setSuccess(false).build();
+    }
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+    }

Review comment:
       so 
   
   1. Change state to pausing will (or at least should)  prevent, e.g. new AppendEntries.
   2. `synchronized (this)` will guarantee that for those operations which get lock before this pause code block, they should be able to finish.
   
   However my current thought is, there could be still be a race condition, when pause and other operations are competing this lock, and then pause wins, other operations will still be executed after pause, which is wrong.
   
   So the resolution can be adding an extra LifeCycle check after, e.g. AppendEntries, entering `synchronized (this)`, to make sure no pause has happened. 
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r493860008



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -358,6 +360,10 @@ public boolean isLeader() {
     return role.isLeader();
   }
 
+  public boolean isActive() {
+    return !lifeCycle.getCurrentState().isPausingOrPaused();
+  }

Review comment:
       SG. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494669145



##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);

Review comment:
       Ack

##########
File path: ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100);
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch1);
+
+    writeThread.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
+    RaftLog followerLog = follower.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
+
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    SimpleMessage[] batch2 = SimpleMessage.create(100);
+    Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch2);
+
+    writeThread2.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    // paused follower should not have any batch2 message in its raftlog.
+    Assert.assertTrue(RaftTestUtil.logEntriesNotContains(followerLog, batch2));
+  }

Review comment:
       sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r494486255



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1181,41 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  public boolean pause() throws IOException {
+    // TODO: should pause() be limited on only working for a follower?
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+      lifeCycle.compareAndTransition(PAUSING, PAUSED);
+    }
+    return true;
+  }
+
+  public boolean resume() {
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be resumed?
+      try {
+        stateMachine.reinitialize();
+      } catch (IOException e) {
+        LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+        lifeCycle.compareAndTransition(STARTING, PAUSED);

Review comment:
       It should also rethrow the exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#discussion_r488288695



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
##########
@@ -1171,6 +1177,26 @@ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto req
     return reply;
   }
 
+  @Override
+  public PauseReplyProto pause(PauseRequestProto request) throws IOException {
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    // TODO: should pause() be limited on only working for a follower?
+    if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+      return PauseReplyProto.newBuilder().setSuccess(false).build();
+    }
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+    }

Review comment:
       Make sense!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-698556805






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #188: RATIS-1043. Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684112568


   @runzhiwang `PAUSING` and `PAUSED` states are in the LifeCycle so technically speaking, any role (leader, follower, candidate) who can be in a `RUNNING` state can be paused.
   
   However, the current known usage is to pause a lagging follower and install a snapshot to help the follower catch up (e.g. lagging OM in Ozone HA mode or lagging followers on Ozone DataNode). 
   
   That's why I left a comment say `pause()` might only matter for a follower now. 
   
   Regarding who will call the `pause()`, it is not decided yet and I will have a design for it (relevant context: https://issues.apache.org/jira/browse/RATIS-624?focusedCommentId=17185513&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17185513)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684161345


   @runzhiwang 
   
   I believe when and who to call `pause` is still not decided. I hope I can have a concrete design after 1) have pause()/resume() implemented 2) have an implementation for lag detection.
   
   However, at least I think `pause->take/install snapshot->resume` should be managed by either leader or application, but not follower.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #188: RATIS-1043. Add Pause/Resume to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-698642627


   @szetszwo comment addressed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #188: [RATIS-1043] Add Pause() to RaftServerImpl

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #188:
URL: https://github.com/apache/incubator-ratis/pull/188#issuecomment-684125141


   > In Ozone, once an OM realizes that it is lagging behind, it would call this RaftServer#pause and then install the snapshot. It should not need a client to tell it to pause the Ratis server. Install snapshot in OM happens automatically when required.
   
   @amaliujia I copied this from https://issues.apache.org/jira/browse/RATIS-624.  If I'm wrong, please correct me.
   As this comment said, the follower should detect the lag behind by itself, and then the follower call `pause` by itself,  rather than leader call `follower.pause`. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org