You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/09/24 23:58:11 UTC

[incubator-ratis] branch master updated: RATIS-1043. Add Pause/Resume to RaftServerImpl (#188) Contributed by Rui Wang

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new b559f48  RATIS-1043. Add Pause/Resume to RaftServerImpl (#188)  Contributed by Rui Wang
b559f48 is described below

commit b559f48c870a7ca3391d77e3c602eccc68eb5b7f
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Thu Sep 24 16:58:04 2020 -0700

    RATIS-1043. Add Pause/Resume to RaftServerImpl (#188)  Contributed by Rui Wang
---
 .../main/java/org/apache/ratis/util/LifeCycle.java |  8 ++
 .../apache/ratis/server/impl/RaftServerImpl.java   | 48 +++++++++++-
 .../test/java/org/apache/ratis/RaftTestUtil.java   | 35 ++++++++-
 .../ratis/server/impl/ServerPauseResumeTest.java   | 88 ++++++++++++++++++++++
 .../ratis/grpc/TestServerPauseResumeWithGrpc.java  | 26 +++++++
 .../netty/TestServerPauseResumeWithNetty.java      | 26 +++++++
 .../TestServerPauseResumeWithSimulatedRpc.java     | 26 +++++++
 7 files changed, 253 insertions(+), 4 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index a2929f2..f789e27 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -82,6 +82,11 @@ public class LifeCycle {
       return States.CLOSING_OR_CLOSED.contains(this);
     }
 
+    /** Is this {@link State#PAUSING} or {@link State#PAUSED}? */
+    public boolean isPausingOrPaused() {
+      return States.PAUSING_OR_PAUSED.contains(this);
+    }
+
     static void put(State key, Map<State, List<State>> map, State... values) {
       map.put(key, Collections.unmodifiableList(Arrays.asList(values)));
     }
@@ -128,6 +133,9 @@ public class LifeCycle {
     public static final Set<State> CLOSING_OR_CLOSED
         = Collections.unmodifiableSet(EnumSet.of(State.CLOSING, State.CLOSED));
 
+    public static final Set<State> PAUSING_OR_PAUSED
+        = Collections.unmodifiableSet(EnumSet.of(State.PAUSING, State.PAUSED));
+
     public static final Set<State> CLOSING_OR_CLOSED_OR_EXCEPTION
         = Collections.unmodifiableSet(EnumSet.of(State.CLOSING, State.CLOSED, State.EXCEPTION));
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 88ac669..e99b847 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -67,7 +67,10 @@ import java.util.stream.Collectors;
 import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
 import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
 import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
+import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
 import static org.apache.ratis.util.LifeCycle.State.NEW;
+import static org.apache.ratis.util.LifeCycle.State.PAUSED;
+import static org.apache.ratis.util.LifeCycle.State.PAUSING;
 import static org.apache.ratis.util.LifeCycle.State.RUNNING;
 import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
@@ -367,6 +370,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return role.isLeader();
   }
 
+  public boolean isPausingOrPaused() {
+    return lifeCycle.getCurrentState().isPausingOrPaused();
+  }
+
   /**
    * return ref to the commit info cache.
    * @return commit info cache
@@ -899,6 +906,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     boolean shouldShutdown = false;
     final RequestVoteReplyProto reply;
     synchronized (this) {
+      // Check life cycle state again to avoid the PAUSING/PAUSED state.
+      assertLifeCycleState(LifeCycle.States.RUNNING);
       final FollowerState fs = role.getFollowerState().orElse(null);
       if (shouldWithholdVotes(candidateTerm)) {
         LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}",
@@ -1048,7 +1057,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   @SuppressWarnings("checkstyle:parameternumber")
   private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
-      List<CommitInfoProto> commitInfos, LogEntryProto... entries) {
+      List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException {
     final boolean isHeartbeat = entries.length == 0;
     logAppendEntries(isHeartbeat,
         () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
@@ -1061,6 +1070,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     final Optional<FollowerState> followerState;
     Timer.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
     synchronized (this) {
+      // Check life cycle state again to avoid the PAUSING/PAUSED state.
+      assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
       final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
@@ -1200,6 +1211,41 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return reply;
   }
 
+  public boolean pause() throws IOException {
+    // TODO: should pause() be limited on only working for a follower?
+
+    // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+    // Pause() should pause ongoing operations:
+    //  a. call {@link StateMachine#pause()}.
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be paused?
+      stateMachine.pause();
+      lifeCycle.compareAndTransition(PAUSING, PAUSED);
+    }
+    return true;
+  }
+
+  public boolean resume() throws IOException {
+    synchronized (this) {
+      if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+        return false;
+      }
+      // TODO: any other operations that needs to be resumed?
+      try {
+        stateMachine.reinitialize();
+      } catch (IOException e) {
+        LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+        lifeCycle.compareAndTransition(STARTING, EXCEPTION);
+        throw e;
+      }
+      lifeCycle.compareAndTransition(STARTING, RUNNING);
+    }
+    return true;
+  }
+
   private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
     final RaftRpcRequestProto r = request.getServerRequest();
     final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 1edbe53..f743bc9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -149,6 +149,10 @@ public interface RaftTestUtil {
     return logEntriesContains(log, 0L, Long.MAX_VALUE, expectedMessages);
   }
 
+  static boolean logEntriesNotContains(RaftLog log, SimpleMessage... expectedMessages) {
+    return logEntriesNotContains(log, 0L, Long.MAX_VALUE, expectedMessages);
+  }
+
   static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
     int idxEntries = 0;
     int idxExpected = 0;
@@ -168,6 +172,29 @@ public interface RaftTestUtil {
     return idxExpected == expectedMessages.length;
   }
 
+  // Check whether raftlog contains any expected message between startIndex and endIndex.
+  // Return true if raftlog does not contain any expected message, returns false otherwise.
+  static boolean logEntriesNotContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
+    int idxEntries = 0;
+    int idxExpected = 0;
+    TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
+    while (idxEntries < termIndices.length
+        && idxExpected < expectedMessages.length) {
+      try {
+        if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
+            log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) {
+          return false;
+        } else {
+          ++idxExpected;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      ++idxEntries;
+    }
+    return true;
+  }
+
   static void checkLogEntries(RaftLog log, SimpleMessage[] expectedMessages,
       Predicate<LogEntryProto> predicate) {
     TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
@@ -425,8 +452,8 @@ public interface RaftTestUtil {
     Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS));
   }
 
-  static void sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) {
-    new Thread(() -> {
+  static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) {
+    Thread t = new Thread(() -> {
       try (final RaftClient client = cluster.createClient(leaderId)) {
         for (SimpleMessage mssg: messages) {
           client.send(mssg);
@@ -434,7 +461,9 @@ public interface RaftTestUtil {
       } catch (Exception e) {
         e.printStackTrace();
       }
-    }).start();
+    });
+    t.start();
+    return t;
   }
 
   static void assertSameLog(RaftLog expected, RaftLog computed) throws Exception {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
new file mode 100644
index 0000000..ff3aa5c
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+    extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+
+  @Test
+  public void testPauseResume() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+  }
+
+  void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+    // wait leader be elected.
+    RaftServerImpl leader = waitForLeader(cluster);
+    RaftPeerId leaderId = leader.getId();
+    List<RaftServerImpl> followers = cluster.getFollowers();
+    Assert.assertTrue(followers.size() >= 1);
+    RaftServerImpl follower = followers.get(0);
+
+    SimpleMessage[] batch1 = SimpleMessage.create(100, "batch1");
+    Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch1);
+
+    writeThread.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    RaftLog leaderLog = leader.getState().getLog();
+    // leader should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
+    RaftLog followerLog = follower.getState().getLog();
+    // follower should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
+
+    // pause follower.
+    boolean isSuccess = follower.pause();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(follower.isPausingOrPaused());
+
+    SimpleMessage[] batch2 = SimpleMessage.create(100, "batch2");
+    Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch2);
+
+    writeThread2.join();
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    // paused follower should not have any batch2 message in its raftlog.
+    Assert.assertTrue(RaftTestUtil.logEntriesNotContains(followerLog, batch2));
+
+    // resume follower.
+    isSuccess = follower.resume();
+    Assert.assertTrue(isSuccess);
+    Assert.assertTrue(!follower.isPausingOrPaused());
+
+    Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+    // follower should contain all logs.
+    Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch2));
+  }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestServerPauseResumeWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestServerPauseResumeWithGrpc.java
new file mode 100644
index 0000000..c46e841
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestServerPauseResumeWithGrpc.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.server.impl.ServerPauseResumeTest;
+
+public class TestServerPauseResumeWithGrpc
+    extends ServerPauseResumeTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestServerPauseResumeWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestServerPauseResumeWithNetty.java
new file mode 100644
index 0000000..6ef88d1
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestServerPauseResumeWithNetty.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.server.impl.ServerPauseResumeTest;
+
+public class TestServerPauseResumeWithNetty
+    extends ServerPauseResumeTest<MiniRaftClusterWithNetty>
+    implements MiniRaftClusterWithNetty.FactoryGet {
+
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerPauseResumeWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerPauseResumeWithSimulatedRpc.java
new file mode 100644
index 0000000..f273c8e
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerPauseResumeWithSimulatedRpc.java
@@ -0,0 +1,26 @@
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.impl.ServerPauseResumeTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class TestServerPauseResumeWithSimulatedRpc
+    extends ServerPauseResumeTest<MiniRaftClusterWithSimulatedRpc>
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+
+}