You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/09/24 23:58:11 UTC
[incubator-ratis] branch master updated: RATIS-1043. Add
Pause/Resume to RaftServerImpl (#188) Contributed by Rui Wang
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b559f48 RATIS-1043. Add Pause/Resume to RaftServerImpl (#188) Contributed by Rui Wang
b559f48 is described below
commit b559f48c870a7ca3391d77e3c602eccc68eb5b7f
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Thu Sep 24 16:58:04 2020 -0700
RATIS-1043. Add Pause/Resume to RaftServerImpl (#188) Contributed by Rui Wang
---
.../main/java/org/apache/ratis/util/LifeCycle.java | 8 ++
.../apache/ratis/server/impl/RaftServerImpl.java | 48 +++++++++++-
.../test/java/org/apache/ratis/RaftTestUtil.java | 35 ++++++++-
.../ratis/server/impl/ServerPauseResumeTest.java | 88 ++++++++++++++++++++++
.../ratis/grpc/TestServerPauseResumeWithGrpc.java | 26 +++++++
.../netty/TestServerPauseResumeWithNetty.java | 26 +++++++
.../TestServerPauseResumeWithSimulatedRpc.java | 26 +++++++
7 files changed, 253 insertions(+), 4 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index a2929f2..f789e27 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -82,6 +82,11 @@ public class LifeCycle {
return States.CLOSING_OR_CLOSED.contains(this);
}
+ /** Is this {@link State#PAUSING} or {@link State#PAUSED}? */
+ public boolean isPausingOrPaused() {
+ return States.PAUSING_OR_PAUSED.contains(this);
+ }
+
static void put(State key, Map<State, List<State>> map, State... values) {
map.put(key, Collections.unmodifiableList(Arrays.asList(values)));
}
@@ -128,6 +133,9 @@ public class LifeCycle {
public static final Set<State> CLOSING_OR_CLOSED
= Collections.unmodifiableSet(EnumSet.of(State.CLOSING, State.CLOSED));
+ public static final Set<State> PAUSING_OR_PAUSED
+ = Collections.unmodifiableSet(EnumSet.of(State.PAUSING, State.PAUSED));
+
public static final Set<State> CLOSING_OR_CLOSED_OR_EXCEPTION
= Collections.unmodifiableSet(EnumSet.of(State.CLOSING, State.CLOSED, State.EXCEPTION));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 88ac669..e99b847 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -67,7 +67,10 @@ import java.util.stream.Collectors;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
import static org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
+import static org.apache.ratis.util.LifeCycle.State.EXCEPTION;
import static org.apache.ratis.util.LifeCycle.State.NEW;
+import static org.apache.ratis.util.LifeCycle.State.PAUSED;
+import static org.apache.ratis.util.LifeCycle.State.PAUSING;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
import static org.apache.ratis.util.LifeCycle.State.STARTING;
@@ -367,6 +370,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return role.isLeader();
}
+ public boolean isPausingOrPaused() {
+ return lifeCycle.getCurrentState().isPausingOrPaused();
+ }
+
/**
* return ref to the commit info cache.
* @return commit info cache
@@ -899,6 +906,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
synchronized (this) {
+ // Check life cycle state again to avoid the PAUSING/PAUSED state.
+ assertLifeCycleState(LifeCycle.States.RUNNING);
final FollowerState fs = role.getFollowerState().orElse(null);
if (shouldWithholdVotes(candidateTerm)) {
LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}",
@@ -1048,7 +1057,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
@SuppressWarnings("checkstyle:parameternumber")
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
- List<CommitInfoProto> commitInfos, LogEntryProto... entries) {
+ List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException {
final boolean isHeartbeat = entries.length == 0;
logAppendEntries(isHeartbeat,
() -> getMemberId() + ": receive appendEntries(" + leaderId + ", " + leaderTerm + ", "
@@ -1061,6 +1070,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
final Optional<FollowerState> followerState;
Timer.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
synchronized (this) {
+ // Check life cycle state again to avoid the PAUSING/PAUSED state.
+ assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
@@ -1200,6 +1211,41 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return reply;
}
+ public boolean pause() throws IOException {
+ // TODO: should pause() be limited on only working for a follower?
+
+ // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
+ // Pause() should pause ongoing operations:
+ // a. call {@link StateMachine#pause()}.
+ synchronized (this) {
+ if (!lifeCycle.compareAndTransition(RUNNING, PAUSING)) {
+ return false;
+ }
+ // TODO: any other operations that needs to be paused?
+ stateMachine.pause();
+ lifeCycle.compareAndTransition(PAUSING, PAUSED);
+ }
+ return true;
+ }
+
+ public boolean resume() throws IOException {
+ synchronized (this) {
+ if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
+ return false;
+ }
+ // TODO: any other operations that needs to be resumed?
+ try {
+ stateMachine.reinitialize();
+ } catch (IOException e) {
+ LOG.warn("Failed to reinitialize statemachine: {}", stateMachine.toString());
+ lifeCycle.compareAndTransition(STARTING, EXCEPTION);
+ throw e;
+ }
+ lifeCycle.compareAndTransition(STARTING, RUNNING);
+ }
+ return true;
+ }
+
private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 1edbe53..f743bc9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -149,6 +149,10 @@ public interface RaftTestUtil {
return logEntriesContains(log, 0L, Long.MAX_VALUE, expectedMessages);
}
+ static boolean logEntriesNotContains(RaftLog log, SimpleMessage... expectedMessages) {
+ return logEntriesNotContains(log, 0L, Long.MAX_VALUE, expectedMessages);
+ }
+
static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
int idxEntries = 0;
int idxExpected = 0;
@@ -168,6 +172,29 @@ public interface RaftTestUtil {
return idxExpected == expectedMessages.length;
}
+ // Check whether raftlog contains any expected message between startIndex and endIndex.
+ // Return true if raftlog does not contain any expected message, returns false otherwise.
+ static boolean logEntriesNotContains(RaftLog log, long startIndex, long endIndex, SimpleMessage... expectedMessages) {
+ int idxEntries = 0;
+ int idxExpected = 0;
+ TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
+ while (idxEntries < termIndices.length
+ && idxExpected < expectedMessages.length) {
+ try {
+ if (Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
+ log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) {
+ return false;
+ } else {
+ ++idxExpected;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ ++idxEntries;
+ }
+ return true;
+ }
+
static void checkLogEntries(RaftLog log, SimpleMessage[] expectedMessages,
Predicate<LogEntryProto> predicate) {
TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
@@ -425,8 +452,8 @@ public interface RaftTestUtil {
Thread.sleep(3 * maxTimeout.toLong(TimeUnit.MILLISECONDS));
}
- static void sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) {
- new Thread(() -> {
+ static Thread sendMessageInNewThread(MiniRaftCluster cluster, RaftPeerId leaderId, SimpleMessage... messages) {
+ Thread t = new Thread(() -> {
try (final RaftClient client = cluster.createClient(leaderId)) {
for (SimpleMessage mssg: messages) {
client.send(mssg);
@@ -434,7 +461,9 @@ public interface RaftTestUtil {
} catch (Exception e) {
e.printStackTrace();
}
- }).start();
+ });
+ t.start();
+ return t;
}
static void assertSameLog(RaftLog expected, RaftLog computed) throws Exception {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
new file mode 100644
index 0000000..ff3aa5c
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test server pause and resume. */
+public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+ public static final int NUM_SERVERS = 3;
+
+ @Test
+ public void testPauseResume() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
+ }
+
+ void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
+ // wait leader be elected.
+ RaftServerImpl leader = waitForLeader(cluster);
+ RaftPeerId leaderId = leader.getId();
+ List<RaftServerImpl> followers = cluster.getFollowers();
+ Assert.assertTrue(followers.size() >= 1);
+ RaftServerImpl follower = followers.get(0);
+
+ SimpleMessage[] batch1 = SimpleMessage.create(100, "batch1");
+ Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch1);
+
+ writeThread.join();
+ Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+ RaftLog leaderLog = leader.getState().getLog();
+ // leader should contain all logs.
+ Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
+ RaftLog followerLog = follower.getState().getLog();
+ // follower should contain all logs.
+ Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
+
+ // pause follower.
+ boolean isSuccess = follower.pause();
+ Assert.assertTrue(isSuccess);
+ Assert.assertTrue(follower.isPausingOrPaused());
+
+ SimpleMessage[] batch2 = SimpleMessage.create(100, "batch2");
+ Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch2);
+
+ writeThread2.join();
+ Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+ // paused follower should not have any batch2 message in its raftlog.
+ Assert.assertTrue(RaftTestUtil.logEntriesNotContains(followerLog, batch2));
+
+ // resume follower.
+ isSuccess = follower.resume();
+ Assert.assertTrue(isSuccess);
+ Assert.assertTrue(!follower.isPausingOrPaused());
+
+ Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
+ // follower should contain all logs.
+ Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch2));
+ }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestServerPauseResumeWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestServerPauseResumeWithGrpc.java
new file mode 100644
index 0000000..c46e841
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestServerPauseResumeWithGrpc.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.server.impl.ServerPauseResumeTest;
+
+public class TestServerPauseResumeWithGrpc
+ extends ServerPauseResumeTest<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/netty/TestServerPauseResumeWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/netty/TestServerPauseResumeWithNetty.java
new file mode 100644
index 0000000..6ef88d1
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/netty/TestServerPauseResumeWithNetty.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.server.impl.ServerPauseResumeTest;
+
+public class TestServerPauseResumeWithNetty
+ extends ServerPauseResumeTest<MiniRaftClusterWithNetty>
+ implements MiniRaftClusterWithNetty.FactoryGet {
+
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerPauseResumeWithSimulatedRpc.java b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerPauseResumeWithSimulatedRpc.java
new file mode 100644
index 0000000..f273c8e
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerPauseResumeWithSimulatedRpc.java
@@ -0,0 +1,26 @@
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.impl.ServerPauseResumeTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class TestServerPauseResumeWithSimulatedRpc
+ extends ServerPauseResumeTest<MiniRaftClusterWithSimulatedRpc>
+ implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+
+}