You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/27 04:39:32 UTC
[incubator-ratis] branch master updated: RATIS-1266. Leader send
StartLeaderElectionRequest to higher priority peer (#376)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 79fd5af RATIS-1266. Leader send StartLeaderElectionRequest to higher priority peer (#376)
79fd5af is described below
commit 79fd5afde2b4cd92ef86ebea7a047204d981a2a2
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Sun Dec 27 12:39:21 2020 +0800
RATIS-1266. Leader send StartLeaderElectionRequest to higher priority peer (#376)
* RATIS-1266. Leader send TimeoutNow request to higher priority peer
---
.../grpc/server/GrpcServerProtocolClient.java | 7 +++
.../grpc/server/GrpcServerProtocolService.java | 14 ++++++
.../org/apache/ratis/grpc/server/GrpcService.java | 8 ++++
.../ratis/hadooprpc/server/HadoopRpcService.java | 9 ++++
.../RaftServerProtocolServerSideTranslatorPB.java | 9 ++++
.../src/main/proto/HadoopCompatability.proto | 1 +
.../java/org/apache/ratis/netty/NettyRpcProxy.java | 2 +
.../apache/ratis/netty/server/NettyRpcService.java | 19 ++++++++
ratis-proto/src/main/proto/Grpc.proto | 3 ++
ratis-proto/src/main/proto/Netty.proto | 2 +
ratis-proto/src/main/proto/Raft.proto | 9 ++++
.../ratis/server/protocol/RaftServerProtocol.java | 4 ++
.../apache/ratis/server/impl/LeaderStateImpl.java | 39 +++++++++++------
.../apache/ratis/server/impl/RaftServerImpl.java | 50 ++++++++++++++++++++++
.../apache/ratis/server/impl/RaftServerProxy.java | 7 +++
.../apache/ratis/server/impl/ServerProtoUtils.java | 17 ++++++++
.../server/impl/BlockRequestHandlingInjection.java | 1 +
.../ratis/server/simulation/RaftServerReply.java | 33 ++++++++++++--
.../ratis/server/simulation/RaftServerRequest.java | 32 ++++++++++++--
.../server/simulation/SimulatedServerRpc.java | 12 ++++++
.../ratis/datastream/DataStreamBaseTest.java | 7 +++
21 files changed, 265 insertions(+), 20 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index ea1db78..fcf8126 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -100,6 +100,13 @@ public class GrpcServerProtocolClient implements Closeable {
return r;
}
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) {
+ StartLeaderElectionReplyProto r =
+ blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .startLeaderElection(request);
+ return r;
+ }
+
StreamObserver<AppendEntriesRequestProto> appendEntries(
StreamObserver<AppendEntriesReplyProto> responseHandler) {
return asyncStub.appendEntries(responseHandler);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index 66d69b6..90386fd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -177,6 +177,20 @@ class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
}
@Override
+ public void startLeaderElection(StartLeaderElectionRequestProto request,
+ StreamObserver<StartLeaderElectionReplyProto> responseObserver) {
+ try {
+ final StartLeaderElectionReplyProto reply = server.startLeaderElection(request);
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ } catch (Throwable e) {
+ GrpcUtil.warn(LOG,
+ () -> getId() + ": Failed startLeaderElection " + ProtoUtils.toString(request.getServerRequest()), e);
+ responseObserver.onError(GrpcUtil.wrapException(e));
+ }
+ }
+
+ @Override
public StreamObserver<AppendEntriesRequestProto> appendEntries(
StreamObserver<AppendEntriesReplyProto> responseObserver) {
return new ServerRequestStreamObserver<AppendEntriesRequestProto, AppendEntriesReplyProto>(
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 57500e7..f570672 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -223,4 +223,12 @@ public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocol
final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
return getProxies().getProxy(target).requestVote(request);
}
+
+ @Override
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+ CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), null, request);
+
+ final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
+ return getProxies().getProxy(target).startLeaderElection(request);
+ }
}
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index a9ca1a9..52c2c31 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -30,6 +30,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.proto.hadoop.HadoopCompatibilityProtos.HadoopServerProtocolService;
import org.apache.ratis.proto.hadoop.HadoopCompatibilityProtos.HadoopClientProtocolService;
import org.apache.ratis.proto.hadoop.HadoopCompatibilityProtos.ServerOps;
@@ -183,6 +185,13 @@ public final class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftSer
ServerOps.requestVote, RequestVoteReplyProto::parseFrom);
}
+ @Override
+ public StartLeaderElectionReplyProto startLeaderElection(
+ StartLeaderElectionRequestProto request) throws IOException {
+ return processRequest(request, request.getServerRequest().getReplyId(),
+ ServerOps.startLeaderElection, StartLeaderElectionReplyProto::parseFrom);
+ }
+
private <REQUEST extends GeneratedMessageV3, REPLY> REPLY processRequest(
REQUEST request, org.apache.ratis.thirdparty.com.google.protobuf.ByteString replyId,
ServerOps type, CheckedFunction<byte[], REPLY, InvalidProtocolBufferException> func)
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
index c5d1948..9f61207 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
@@ -33,6 +33,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.thirdparty.com.google.protobuf.GeneratedMessageV3;
@InterfaceAudience.Private
@@ -54,6 +56,9 @@ public class RaftServerProtocolServerSideTranslatorPB
case requestVote:
respone = requestVote(RequestVoteRequestProto.parseFrom(buffer));
break;
+ case startLeaderElection:
+ respone = startLeaderElection(StartLeaderElectionRequestProto.parseFrom(buffer));
+ break;
case installSnapshot:
respone = installSnapshot(InstallSnapshotRequestProto.parseFrom(buffer));
break;
@@ -77,6 +82,10 @@ public class RaftServerProtocolServerSideTranslatorPB
return impl.requestVote(request);
}
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+ return impl.startLeaderElection(request);
+ }
+
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request)
throws IOException {
return impl.appendEntries(request);
diff --git a/ratis-hadoop/src/main/proto/HadoopCompatability.proto b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
index 9fec7c2..1b67755 100644
--- a/ratis-hadoop/src/main/proto/HadoopCompatability.proto
+++ b/ratis-hadoop/src/main/proto/HadoopCompatability.proto
@@ -34,6 +34,7 @@ enum ServerOps {
requestVote = 1;
appendEntries = 2;
installSnapshot = 3;
+ startLeaderElection = 4;
}
message ServerRequestProto {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 65a8052..b2b8763 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -79,6 +79,8 @@ public class NettyRpcProxy implements Closeable {
switch (proto.getRaftNettyServerReplyCase()) {
case REQUESTVOTEREPLY:
return proto.getRequestVoteReply().getServerReply().getCallId();
+ case STARTLEADERELECTIONREPLY:
+ return proto.getStartLeaderElectionReply().getServerReply().getCallId();
case APPENDENTRIESREPLY:
return proto.getAppendEntriesReply().getServerReply().getCallId();
case INSTALLSNAPSHOTREPLY:
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 7d96579..50279d6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -184,6 +184,13 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(transferLeadershipReply))
.build();
+ case STARTLEADERELECTIONREQUEST:
+ final StartLeaderElectionRequestProto startLeaderElectionRequest = proto.getStartLeaderElectionRequest();
+ rpcRequest = startLeaderElectionRequest.getServerRequest();
+ final StartLeaderElectionReplyProto startLeaderElectionReply =
+ server.startLeaderElection(startLeaderElectionRequest);
+ return RaftNettyServerReplyProto.newBuilder().setStartLeaderElectionReply(startLeaderElectionReply).build();
+
case APPENDENTRIESREQUEST:
final AppendEntriesRequestProto appendEntriesRequest = proto.getAppendEntriesRequest();
rpcRequest = appendEntriesRequest.getServerRequest();
@@ -282,6 +289,18 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
return sendRaftNettyServerRequestProto(serverRequest, proto).getRequestVoteReply();
}
+
+ @Override
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+ CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request);
+
+ final RaftNettyServerRequestProto proto = RaftNettyServerRequestProto.newBuilder()
+ .setStartLeaderElectionRequest(request)
+ .build();
+ final RaftRpcRequestProto serverRequest = request.getServerRequest();
+ return sendRaftNettyServerRequestProto(serverRequest, proto).getStartLeaderElectionReply();
+ }
+
@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException {
CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, getId(), null, request);
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index bd97961..14b6067 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -44,6 +44,9 @@ service RaftServerProtocolService {
rpc requestVote(ratis.common.RequestVoteRequestProto)
returns(ratis.common.RequestVoteReplyProto) {}
+ rpc startLeaderElection(ratis.common.StartLeaderElectionRequestProto)
+ returns(ratis.common.StartLeaderElectionReplyProto) {}
+
rpc appendEntries(stream ratis.common.AppendEntriesRequestProto)
returns(stream ratis.common.AppendEntriesReplyProto) {}
diff --git a/ratis-proto/src/main/proto/Netty.proto b/ratis-proto/src/main/proto/Netty.proto
index 17155a4..cea3ada 100644
--- a/ratis-proto/src/main/proto/Netty.proto
+++ b/ratis-proto/src/main/proto/Netty.proto
@@ -39,6 +39,7 @@ message RaftNettyServerRequestProto {
ratis.common.GroupListRequestProto groupListRequest = 7;
ratis.common.GroupInfoRequestProto groupInfoRequest = 8;
ratis.common.TransferLeadershipRequestProto transferLeadershipRequest = 9;
+ ratis.common.StartLeaderElectionRequestProto startLeaderElectionRequest = 10;
}
}
@@ -51,5 +52,6 @@ message RaftNettyServerReplyProto {
ratis.common.GroupListReplyProto groupListReply = 5;
ratis.common.GroupInfoReplyProto groupInfoReply = 6;
RaftNettyExceptionReplyProto exceptionReply = 7;
+ ratis.common.StartLeaderElectionReplyProto startLeaderElectionReply = 8;
}
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 1d5e57e..822783a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -392,6 +392,15 @@ message TransferLeadershipRequestProto {
RaftPeerProto newLeader = 2;
}
+message StartLeaderElectionRequestProto {
+ RaftRpcRequestProto serverRequest = 1;
+ TermIndexProto leaderLastEntry = 2;
+}
+
+message StartLeaderElectionReplyProto {
+ RaftRpcReplyProto serverReply = 1;
+}
+
// A request to add a new group
message GroupAddRequestProto {
RaftGroupProto group = 1; // the group to be added.
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
index c8cfad6..6e707ee 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
@@ -25,6 +25,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
public interface RaftServerProtocol {
enum Op {REQUEST_VOTE, APPEND_ENTRIES, INSTALL_SNAPSHOT}
@@ -34,4 +36,6 @@ public interface RaftServerProtocol {
AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException;
InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException;
+
+ StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index d54e337..4bf79c2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -23,6 +23,8 @@ import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -532,16 +534,27 @@ class LeaderStateImpl implements LeaderState {
}
}
- private synchronized void yieldLeaderToHigherPriorityPeer(long term, TermIndex lastEntry) {
+ private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId follower, TermIndex lastEntry) {
ServerState state = server.getState();
TermIndex currLastEntry = state.getLastEntry();
if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
- LOG.warn("{} can not stepDown because currLastEntry:{} did not match lastEntry:{}",
- this, currLastEntry, lastEntry);
+ LOG.warn("{} can not send StartLeaderElectionRequest to follower:{} because currLastEntry:{} " +
+ "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry);
return;
}
- stepDown(term, StepDownReason.HIGHER_PRIORITY);
+ final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
+ server.getMemberId(), follower, lastEntry);
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ StartLeaderElectionReplyProto replyProto = server.getServerRpc().startLeaderElection(r);
+ LOG.info("{} received {} reply of StartLeaderElectionRequest from follower:{}",
+ this, replyProto.getServerReply().getSuccess() ? "success" : "fail", follower);
+ } catch (IOException e) {
+ LOG.warn("{} send StartLeaderElectionRequest throw exception", this, e);
+ }
+ return null;
+ });
}
private void prepare() {
@@ -872,23 +885,21 @@ class LeaderStateImpl implements LeaderState {
final TermIndex leaderLastEntry = server.getState().getLastEntry();
if (leaderLastEntry == null) {
- LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
- "and leader's lastEntry is null",
- this, currentTerm, followerID, followerPriority, leaderPriority);
+ LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
+ "is higher than leader's:{} and leader's lastEntry is null",
+ this, followerID, currentTerm, followerPriority, leaderPriority);
- // step down as follower
- yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
+ sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry);
return;
}
if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
- LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
- "and follower's lastEntry index:{} catch up with leader's:{}",
- this, currentTerm, followerID, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
+ LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
+ "is higher than leader's:{} and follower's lastEntry index:{} catch up with leader's:{}",
+ this, followerID, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
leaderLastEntry.getIndex());
- // step down as follower
- yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
+ sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry);
return;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f24c3bc..5f2c417 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -96,6 +96,7 @@ class RaftServerImpl implements RaftServer.Division,
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete";
+ static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection";
class Info implements DivisionInfo {
@Override
@@ -1387,6 +1388,55 @@ class RaftServerImpl implements RaftServer.Division,
return true;
}
+ @Override
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+ final RaftRpcRequestProto r = request.getServerRequest();
+ final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
+ final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
+ final TermIndex leaderLastEntry = TermIndex.valueOf(request.getLeaderLastEntry());
+
+ CodeInjectionForTesting.execute(START_LEADER_ELECTION, getId(), leaderId, request);
+
+ LOG.debug("{}: receive startLeaderElection from:{}, leaderLastEntry:{},",
+ getMemberId(), leaderId, request.getLeaderLastEntry());
+
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ assertGroup(leaderId, leaderGroupId);
+
+ synchronized (this) {
+ // leaderLastEntry should not be null because LeaderStateImpl#start append a placeHolder entry
+ // so leader at each term should has at least one entry
+ if (leaderLastEntry == null) {
+ LOG.warn("{}: receive null leaderLastEntry which is unexpected", getMemberId());
+ return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ }
+
+ // Check life cycle state again to avoid the PAUSING/PAUSED state.
+ assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
+ final boolean recognized = state.recognizeLeader(leaderId, leaderLastEntry.getTerm());
+ if (!recognized) {
+ LOG.warn("{}: Not recognize {} (term={}) as leader, state: {}",
+ getMemberId(), leaderId, leaderLastEntry.getTerm(), state);
+ return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ }
+
+ if (!getInfo().isFollower()) {
+ LOG.warn("{} refused StartLeaderElectionRequest from {}, because role is:{}",
+ getMemberId(), leaderId, role.getCurrentRole());
+ return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ }
+
+ if (ServerState.compareLog(state.getLastEntry(), leaderLastEntry) < 0) {
+ LOG.warn("{} refused StartLeaderElectionRequest from {}, because lastEntry:{} less than leaderEntry:{}",
+ getMemberId(), leaderId, leaderLastEntry, state.getLastEntry());
+ return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
+ }
+
+ changeToCandidate();
+ return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
+ }
+ }
+
private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 6e4c302..e6408f8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -28,6 +28,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
@@ -545,6 +547,11 @@ class RaftServerProxy implements RaftServer {
}
@Override
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+ return getImpl(request.getServerRequest()).startLeaderElection(request);
+ }
+
+ @Override
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
return submitRequest(groupId, impl -> impl.appendEntriesAsync(request));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index f264652..ca672f3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -58,6 +58,23 @@ final class ServerProtoUtils {
return b.build();
}
+ static StartLeaderElectionReplyProto toStartLeaderElectionReplyProto(
+ RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success) {
+ return StartLeaderElectionReplyProto.newBuilder()
+ .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, success))
+ .build();
+ }
+
+ static StartLeaderElectionRequestProto toStartLeaderElectionRequestProto(
+ RaftGroupMemberId requestorId, RaftPeerId replyId, TermIndex lastEntry) {
+ final StartLeaderElectionRequestProto.Builder b = StartLeaderElectionRequestProto.newBuilder()
+ .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId));
+ if (lastEntry != null) {
+ b.setLeaderLastEntry(lastEntry.toProto());
+ }
+ return b.build();
+ }
+
static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId,
long currentTerm, int requestIndex, InstallSnapshotResult result) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
index 8566047..3487aa4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/BlockRequestHandlingInjection.java
@@ -33,6 +33,7 @@ public class BlockRequestHandlingInjection implements CodeInjectionForTesting.Co
CodeInjectionForTesting.put(RaftServerImpl.REQUEST_VOTE, INSTANCE);
CodeInjectionForTesting.put(RaftServerImpl.APPEND_ENTRIES, INSTANCE);
CodeInjectionForTesting.put(RaftServerImpl.INSTALL_SNAPSHOT, INSTANCE);
+ CodeInjectionForTesting.put(RaftServerImpl.START_LEADER_ELECTION, INSTANCE);
}
public static BlockRequestHandlingInjection getInstance() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
index d5545cb..5d9d871 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerReply.java
@@ -17,11 +17,13 @@
*/
package org.apache.ratis.server.simulation;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
import org.apache.ratis.util.ProtoUtils;
import java.util.Objects;
@@ -30,23 +32,34 @@ public class RaftServerReply implements RaftRpcMessage {
private final AppendEntriesReplyProto appendEntries;
private final RequestVoteReplyProto requestVote;
private final InstallSnapshotReplyProto installSnapshot;
+ private final StartLeaderElectionReplyProto startLeaderElection;
RaftServerReply(AppendEntriesReplyProto a) {
appendEntries = Objects.requireNonNull(a);
requestVote = null;
installSnapshot = null;
+ startLeaderElection = null;
}
RaftServerReply(RequestVoteReplyProto r) {
appendEntries = null;
requestVote = Objects.requireNonNull(r);
installSnapshot = null;
+ startLeaderElection = null;
}
RaftServerReply(InstallSnapshotReplyProto i) {
appendEntries = null;
requestVote = null;
installSnapshot = Objects.requireNonNull(i);
+ startLeaderElection = null;
+ }
+
+ RaftServerReply(StartLeaderElectionReplyProto i) {
+ appendEntries = null;
+ requestVote = null;
+ installSnapshot = null;
+ startLeaderElection = Objects.requireNonNull(i);
}
boolean isAppendEntries() {
@@ -61,6 +74,10 @@ public class RaftServerReply implements RaftRpcMessage {
return installSnapshot != null;
}
+ boolean isStartLeaderElection() {
+ return startLeaderElection != null;
+ }
+
AppendEntriesReplyProto getAppendEntries() {
return appendEntries;
}
@@ -73,6 +90,10 @@ public class RaftServerReply implements RaftRpcMessage {
return installSnapshot;
}
+ StartLeaderElectionReplyProto getStartLeaderElection() {
+ return startLeaderElection;
+ }
+
@Override
public boolean isRequest() {
return false;
@@ -84,8 +105,10 @@ public class RaftServerReply implements RaftRpcMessage {
return appendEntries.getServerReply().getRequestorId().toStringUtf8();
} else if (isRequestVote()) {
return requestVote.getServerReply().getRequestorId().toStringUtf8();
- } else {
+ } else if (isInstallSnapshot()) {
return installSnapshot.getServerReply().getRequestorId().toStringUtf8();
+ } else {
+ return startLeaderElection.getServerReply().getRequestorId().toStringUtf8();
}
}
@@ -95,8 +118,10 @@ public class RaftServerReply implements RaftRpcMessage {
return appendEntries.getServerReply().getReplyId().toStringUtf8();
} else if (isRequestVote()) {
return requestVote.getServerReply().getReplyId().toStringUtf8();
- } else {
+ } else if (isInstallSnapshot()) {
return installSnapshot.getServerReply().getReplyId().toStringUtf8();
+ } else {
+ return startLeaderElection.getServerReply().getReplyId().toStringUtf8();
}
}
@@ -106,8 +131,10 @@ public class RaftServerReply implements RaftRpcMessage {
return ProtoUtils.toRaftGroupId(appendEntries.getServerReply().getRaftGroupId());
} else if (isRequestVote()) {
return ProtoUtils.toRaftGroupId(requestVote.getServerReply().getRaftGroupId());
- } else {
+ } else if (isInstallSnapshot()) {
return ProtoUtils.toRaftGroupId(installSnapshot.getServerReply().getRaftGroupId());
+ } else {
+ return ProtoUtils.toRaftGroupId(startLeaderElection.getServerReply().getRaftGroupId());
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
index 1ec791e..787052d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/RaftServerRequest.java
@@ -22,29 +22,41 @@ import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.util.ProtoUtils;
class RaftServerRequest implements RaftRpcMessage {
private final AppendEntriesRequestProto appendEntries;
private final RequestVoteRequestProto requestVote;
private final InstallSnapshotRequestProto installSnapshot;
+ private final StartLeaderElectionRequestProto startLeaderElection;
RaftServerRequest(AppendEntriesRequestProto a) {
appendEntries = a;
requestVote = null;
installSnapshot = null;
+ startLeaderElection = null;
}
RaftServerRequest(RequestVoteRequestProto r) {
appendEntries = null;
requestVote = r;
installSnapshot = null;
+ startLeaderElection = null;
}
RaftServerRequest(InstallSnapshotRequestProto i) {
appendEntries = null;
requestVote = null;
installSnapshot = i;
+ startLeaderElection = null;
+ }
+
+ RaftServerRequest(StartLeaderElectionRequestProto i) {
+ appendEntries = null;
+ requestVote = null;
+ installSnapshot = null;
+ startLeaderElection = i;
}
boolean isAppendEntries() {
@@ -59,6 +71,10 @@ class RaftServerRequest implements RaftRpcMessage {
return installSnapshot != null;
}
+ boolean isStartLeaderElection() {
+ return startLeaderElection != null;
+ }
+
AppendEntriesRequestProto getAppendEntries() {
return appendEntries;
}
@@ -71,6 +87,10 @@ class RaftServerRequest implements RaftRpcMessage {
return installSnapshot;
}
+ StartLeaderElectionRequestProto getStartLeaderElection() {
+ return startLeaderElection;
+ }
+
@Override
public boolean isRequest() {
return true;
@@ -82,8 +102,10 @@ class RaftServerRequest implements RaftRpcMessage {
return appendEntries.getServerRequest().getRequestorId().toStringUtf8();
} else if (isRequestVote()) {
return requestVote.getServerRequest().getRequestorId().toStringUtf8();
- } else {
+ } else if (isInstallSnapshot()) {
return installSnapshot.getServerRequest().getRequestorId().toStringUtf8();
+ } else {
+ return startLeaderElection.getServerRequest().getRequestorId().toStringUtf8();
}
}
@@ -93,8 +115,10 @@ class RaftServerRequest implements RaftRpcMessage {
return appendEntries.getServerRequest().getReplyId().toStringUtf8();
} else if (isRequestVote()) {
return requestVote.getServerRequest().getReplyId().toStringUtf8();
- } else {
+ } else if (isInstallSnapshot()) {
return installSnapshot.getServerRequest().getReplyId().toStringUtf8();
+ } else {
+ return startLeaderElection.getServerRequest().getReplyId().toStringUtf8();
}
}
@@ -104,8 +128,10 @@ class RaftServerRequest implements RaftRpcMessage {
return ProtoUtils.toRaftGroupId(appendEntries.getServerRequest().getRaftGroupId());
} else if (isRequestVote()) {
return ProtoUtils.toRaftGroupId(requestVote.getServerRequest().getRaftGroupId());
- } else {
+ } else if (isInstallSnapshot()) {
return ProtoUtils.toRaftGroupId(installSnapshot.getServerRequest().getRaftGroupId());
+ } else {
+ return ProtoUtils.toRaftGroupId(startLeaderElection.getServerRequest().getRaftGroupId());
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index e11c339..1e648dc 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -17,12 +17,15 @@
*/
package org.apache.ratis.server.simulation;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.GroupListRequest;
import org.apache.ratis.protocol.GroupManagementRequest;
@@ -126,6 +129,13 @@ class SimulatedServerRpc implements RaftServerRpc {
}
@Override
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request)
+ throws IOException {
+ RaftServerReply reply = serverHandler.getRpc().sendRequest(new RaftServerRequest(request));
+ return reply.getStartLeaderElection();
+ }
+
+ @Override
public void addRaftPeers(Collection<RaftPeer> peers) {
// do nothing
}
@@ -151,6 +161,8 @@ class SimulatedServerRpc implements RaftServerRpc {
return new RaftServerReply(server.requestVote(r.getRequestVote()));
} else if (r.isInstallSnapshot()) {
return new RaftServerReply(server.installSnapshot(r.getInstallSnapshot()));
+ } else if (r.isStartLeaderElection()) {
+ return new RaftServerReply(server.startLeaderElection(r.getStartLeaderElection()));
} else {
throw new IllegalStateException("unexpected state");
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index e1b1b1f..b554bf7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,6 +39,8 @@ import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
@@ -253,6 +255,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
+ return null;
+ }
+
+ @Override
public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
return null;
}