You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/24 00:50:14 UTC
[incubator-ratis] branch master updated: RATIS-1260. Implement
transferLeaderShip in server (#372)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 5a4141a RATIS-1260. Implement transferLeaderShip in server (#372)
5a4141a is described below
commit 5a4141a20d6025d1071f836b711a2ec373c4f7a7
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Dec 24 08:50:06 2020 +0800
RATIS-1260. Implement transferLeaderShip in server (#372)
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 16 +++
.../org/apache/ratis/protocol/RaftClientReply.java | 13 ++-
.../exceptions/LeaderSteppingDownException.java | 33 ++++++
.../exceptions/TransferLeadershipException.java | 33 ++++++
.../src/test/java/org/apache/ratis/BaseTest.java | 11 ++
ratis-proto/src/main/proto/Raft.proto | 2 +
.../ratis/server/impl/RaftConfigurationImpl.java | 11 ++
.../apache/ratis/server/impl/RaftServerImpl.java | 85 +++++++++++++--
.../org/apache/ratis/server/impl/ServerState.java | 3 +
.../ratis/server/impl/TransferLeadership.java | 118 +++++++++++++++++++++
.../ratis/server/impl/GroupManagementBaseTest.java | 15 +--
.../ratis/server/impl/LeaderElectionTests.java | 37 +++++++
12 files changed, 353 insertions(+), 24 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 50af252..8543523 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -25,10 +25,12 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
@@ -41,9 +43,11 @@ import java.util.stream.Collectors;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.ALREADYCLOSEDEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.DATASTREAMEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERNOTREADYEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERSTEPPINGDOWNEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.TRANSFERLEADERSHIPEXCEPTION;
public interface ClientProtoUtils {
@@ -255,6 +259,14 @@ public interface ClientProtoUtils {
.map(ProtoUtils::toThrowableProto)
.ifPresent(b::setAlreadyClosedException);
+ Optional.ofNullable(reply.getLeaderSteppingDownException())
+ .map(ProtoUtils::toThrowableProto)
+ .ifPresent(b::setLeaderSteppingDownException);
+
+ Optional.ofNullable(reply.getTransferLeadershipException())
+ .map(ProtoUtils::toThrowableProto)
+ .ifPresent(b::setTransferLeadershipException);
+
final RaftClientReplyProto serialized = b.build();
final RaftException e = reply.getException();
if (e != null) {
@@ -340,6 +352,10 @@ public interface ClientProtoUtils {
e = new LeaderNotReadyException(ProtoUtils.toRaftGroupMemberId(lnreProto.getServerId()));
} else if (replyProto.getExceptionDetailsCase().equals(ALREADYCLOSEDEXCEPTION)) {
e = ProtoUtils.toThrowable(replyProto.getAlreadyClosedException(), AlreadyClosedException.class);
+ } else if (replyProto.getExceptionDetailsCase().equals(LEADERSTEPPINGDOWNEXCEPTION)) {
+ e = ProtoUtils.toThrowable(replyProto.getLeaderSteppingDownException(), LeaderSteppingDownException.class);
+ } else if (replyProto.getExceptionDetailsCase().equals(TRANSFERLEADERSHIPEXCEPTION)) {
+ e = ProtoUtils.toThrowable(replyProto.getTransferLeadershipException(), TransferLeadershipException.class);
} else {
e = null;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index e61b534..ada67c9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -21,10 +21,12 @@ import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
@@ -166,7 +168,8 @@ public class RaftClientReply extends RaftClientMessage {
Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
AlreadyClosedException.class,
NotLeaderException.class, NotReplicatedException.class,
- LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class),
+ LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class,
+ LeaderSteppingDownException.class, TransferLeadershipException.class),
() -> "Unexpected exception class: " + this);
}
}
@@ -234,6 +237,14 @@ public class RaftClientReply extends RaftClientMessage {
return JavaUtils.cast(exception, LeaderNotReadyException.class);
}
+ public LeaderSteppingDownException getLeaderSteppingDownException() {
+ return JavaUtils.cast(exception, LeaderSteppingDownException.class);
+ }
+
+ public TransferLeadershipException getTransferLeadershipException() {
+ return JavaUtils.cast(exception, TransferLeadershipException.class);
+ }
+
/** @return the exception, if there is any; otherwise, return null. */
public RaftException getException() {
return exception;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
new file mode 100644
index 0000000..bc81893
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftGroupMemberId;
+
+public class LeaderSteppingDownException extends RaftException {
+ private final RaftGroupMemberId serverId;
+
+ public LeaderSteppingDownException(RaftGroupMemberId id) {
+ super(id + " is in steppingDown");
+ this.serverId = id;
+ }
+
+ public RaftGroupMemberId getServerId() {
+ return serverId;
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
new file mode 100644
index 0000000..2a503aa
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftGroupMemberId;
+
+public class TransferLeadershipException extends RaftException {
+ private final RaftGroupMemberId serverId;
+
+ public TransferLeadershipException(RaftGroupMemberId id, String msg) {
+ super(msg);
+ this.serverId = id;
+ }
+
+ public RaftGroupMemberId getServerId() {
+ return serverId;
+ }
+}
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 3b3d8cc..42d1e77 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -77,6 +77,17 @@ public abstract class BaseTest {
}
}
+ public List<RaftPeer> getPeersWithPriority(List<RaftPeer> peers, RaftPeer suggestedLeader) {
+ List<RaftPeer> peersWithPriority = new ArrayList<>();
+ for (int i = 0; i < peers.size(); i++) {
+ RaftPeer peer = peers.get(i);
+ final int priority = peer.equals(suggestedLeader)? 2: 1;
+ peersWithPriority.add(
+ RaftPeer.newBuilder().setId(peer.getId()).setAddress(peer.getAddress()).setPriority(priority).build());
+ }
+ return peersWithPriority;
+ }
+
public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
RoutingTable.Builder builder = RoutingTable.newBuilder();
RaftPeer previous = primary;
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index d06431a..16c26c0 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -372,6 +372,8 @@ message RaftClientReplyProto {
LeaderNotReadyExceptionProto leaderNotReadyException = 6;
ThrowableProto alreadyClosedException = 7;
ThrowableProto dataStreamException = 8;
+ ThrowableProto leaderSteppingDownException = 9;
+ ThrowableProto transferLeadershipException = 10;
}
uint64 logIndex = 14; // When the request is a write request and the reply is success, the log index of the transaction
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 8a4ebd1..bb85f75 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -145,6 +145,17 @@ final class RaftConfigurationImpl implements RaftConfiguration {
return conf.contains(peerId);
}
+ boolean isHighestPriority(RaftPeerId peerId) {
+ RaftPeer target = getPeer(peerId);
+ Collection<RaftPeer> peers = getCurrentPeers();
+ for (RaftPeer peer : peers) {
+ if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
boolean containsInOldConf(RaftPeerId peerId) {
return oldConf != null && oldConf.contains(peerId);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e13e432..22494b3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
@@ -32,6 +33,7 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.DivisionProperties;
@@ -169,6 +171,8 @@ class RaftServerImpl implements RaftServer.Division,
// So happens IllegalStateException: ILLEGAL TRANSITION: RUNNING -> RUNNING,
private final AtomicBoolean startComplete;
+ private final TransferLeadership transferLeadership;
+
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
@@ -204,6 +208,8 @@ class RaftServerImpl implements RaftServer.Division,
.build();
return client;
});
+
+ this.transferLeadership = new TransferLeadership(this);
}
@Override
@@ -589,7 +595,8 @@ class RaftServerImpl implements RaftServer.Division,
/**
* @return null if the server is in leader state.
*/
- private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
+ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry,
+ boolean isWrite) {
try {
assertGroup(request.getRequestorId(), request.getRaftGroupId());
} catch (GroupMismatchException e) {
@@ -610,6 +617,13 @@ class RaftServerImpl implements RaftServer.Division,
final RaftClientReply reply = newExceptionReply(request, lnre);
return RetryCacheImpl.failWithReply(reply, entry);
}
+
+ if (isWrite && isSteppingDown()) {
+ final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId());
+ final RaftClientReply reply = newExceptionReply(request, lsde);
+ return RetryCacheImpl.failWithReply(reply, entry);
+ }
+
return null;
}
@@ -653,7 +667,7 @@ class RaftServerImpl implements RaftServer.Division,
final PendingRequest pending;
synchronized (this) {
- reply = checkLeaderState(request, cacheEntry);
+ reply = checkLeaderState(request, cacheEntry, true);
if (reply != null) {
return reply;
}
@@ -718,7 +732,8 @@ class RaftServerImpl implements RaftServer.Division,
replyFuture = staleReadAsync(request);
} else {
// first check the server's leader state
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null,
+ !request.is(TypeCase.READ) && !request.is(TypeCase.WATCH));
if (reply != null) {
return reply;
}
@@ -866,18 +881,68 @@ class RaftServerImpl implements RaftServer.Division,
@Override
public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
- //TODO(runzhiwang): implement transfer leadership in server
- return null;
+ return waitForReply(request, transferLeadershipAsync(request));
+ }
+
+ private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+ TransferLeadershipRequest request, String msg) {
+ LOG.warn(msg);
+ return CompletableFuture.completedFuture(
+ newExceptionReply(request, new TransferLeadershipException(getMemberId(), msg)));
+ }
+
+ boolean isSteppingDown() {
+ return transferLeadership.isSteppingDown();
+ }
+
+ void finishTransferLeadership() {
+ transferLeadership.finish(state.getLeaderId(), false);
}
@Override
public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
throws IOException {
- //TODO(runzhiwang): implement transfer leadership in server
- return null;
+ LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ assertGroup(request.getRequestorId(), request.getRaftGroupId());
+
+ synchronized (this) {
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
+ if (reply != null) {
+ return reply;
+ }
+
+ if (getId().equals(request.getNewLeader())) {
+ return CompletableFuture.completedFuture(newSuccessReply(request));
+ }
+
+ final RaftConfigurationImpl conf = getRaftConf();
+ final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
+
+ // make sure there is no raft reconfiguration in progress
+ if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
+ String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+ " when raft reconfiguration in progress.";
+ return logAndReturnTransferLeadershipFail(request, msg);
+ }
+
+ if (!conf.containsInConf(request.getNewLeader())) {
+ String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+ " as it is not in " + conf;
+ return logAndReturnTransferLeadershipFail(request, msg);
+ }
+
+ if (!conf.isHighestPriority(request.getNewLeader())) {
+ String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+ " as it does not has highest priority " + conf;
+ return logAndReturnTransferLeadershipFail(request, msg);
+ }
+
+ return transferLeadership.start(request);
+ }
}
- @Override
+ @Override
public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
return waitForReply(request, setConfigurationAsync(request));
}
@@ -891,7 +956,7 @@ class RaftServerImpl implements RaftServer.Division,
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
- CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
+ CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, true);
if (reply != null) {
return reply;
}
@@ -899,7 +964,7 @@ class RaftServerImpl implements RaftServer.Division,
final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
final PendingRequest pending;
synchronized (this) {
- reply = checkLeaderState(request, null);
+ reply = checkLeaderState(request, null, false);
if (reply != null) {
return reply;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 466eeb2..908f0e7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -249,6 +249,9 @@ class ServerState implements Closeable {
LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
getMemberId(), leaderId, newLeaderId, getCurrentTerm(), op, suffix);
leaderId = newLeaderId;
+ if (leaderId != null) {
+ server.finishTransferLeadership();
+ }
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
new file mode 100644
index 0000000..b6db5e3
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TransferLeadership {
+ public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class);
+
+ class PendingRequest {
+ private final TransferLeadershipRequest request;
+ private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+ PendingRequest(TransferLeadershipRequest request) {
+ this.request = request;
+ }
+
+ TransferLeadershipRequest getRequest() {
+ return request;
+ }
+
+ CompletableFuture<RaftClientReply> getReplyFuture() {
+ return replyFuture;
+ }
+
+ void complete(RaftPeerId currentLeader, boolean timeout) {
+ if (replyFuture.isDone()) {
+ return;
+ }
+
+ if (currentLeader != null && currentLeader.equals(request.getNewLeader())) {
+ replyFuture.complete(server.newSuccessReply(request));
+ } else if (timeout) {
+ final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
+ "Failed to transfer leadership to " + request.getNewLeader() + ": current leader is " + currentLeader);
+ replyFuture.complete(server.newExceptionReply(request, tle));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return request.toString();
+ }
+ }
+
+ private final RaftServerImpl server;
+ private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+ private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
+
+ TransferLeadership(RaftServerImpl server) {
+ this.server = server;
+ }
+
+ boolean isSteppingDown() {
+ return pending.get() != null;
+ }
+
+ CompletableFuture<RaftClientReply> start(TransferLeadershipRequest request) {
+ final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
+ final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
+ if (previous != null) {
+ if (request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
+ final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+ previous.getReplyFuture().whenComplete((r, e) -> {
+ if (e != null) {
+ replyFuture.completeExceptionally(e);
+ } else {
+ replyFuture.complete(r.isSuccess()? server.newSuccessReply(request)
+ : server.newExceptionReply(request, r.getException()));
+ }
+ });
+ return replyFuture;
+ } else {
+ final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
+ "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
+ return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
+ }
+ }
+
+ scheduler.onTimeout(TimeDuration.ONE_MINUTE,
+ () -> finish(server.getState().getLeaderId(), true),
+ LOG, () -> "Timeout check failed for append entry request: " + request);
+ return supplier.get().getReplyFuture();
+ }
+
+ void finish(RaftPeerId currentLeader, boolean timeout) {
+ Optional.ofNullable(pending.getAndSet(null))
+ .ifPresent(r -> r.complete(currentLeader, timeout));
+ }
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index e50267a..416c17a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -77,17 +77,6 @@ public abstract class GroupManagementBaseTest extends BaseTest {
return getClusterFactory().newCluster(peerNum, prop);
}
- private List<RaftPeer> getPeersWithPriority(List<RaftPeer> peers, int suggestedLeaderIndex) {
- List<RaftPeer> peersWithPriority = new ArrayList<>();
- for (int i = 0; i < peers.size(); i++) {
- RaftPeer peer = peers.get(i);
- final int priority = i == suggestedLeaderIndex? 2: 1;
- peersWithPriority.add(
- RaftPeer.newBuilder().setId(peer.getId()).setAddress(peer.getAddress()).setPriority(priority).build());
- }
- return peersWithPriority;
- }
-
@Test
public void testGroupWithPriority() throws Exception {
final MiniRaftCluster cluster = getCluster(0);
@@ -110,7 +99,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
List<RaftPeer> peers = cluster.getPeers();
Random r = new Random(1);
final int suggestedLeaderIndex = r.nextInt(peers.size());
- List<RaftPeer> peersWithPriority = getPeersWithPriority(peers, suggestedLeaderIndex);
+ List<RaftPeer> peersWithPriority = getPeersWithPriority(peers, peers.get(suggestedLeaderIndex));
final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peersWithPriority);
LOG.info("add new group: " + newGroup);
try (final RaftClient client = cluster.createClient(newGroup)) {
@@ -162,7 +151,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
// change the suggest leader
final int newSuggestedLeaderIndex = (suggestedLeaderIndex + 1) % peersWithPriority.size();
- List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newSuggestedLeaderIndex);
+ List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, peers.get(newSuggestedLeaderIndex));
try (final RaftClient client = cluster.createClient(newGroup)) {
RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index cf9cfda..3cd843a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -25,6 +25,7 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
@@ -43,6 +44,7 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -117,6 +119,41 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
}
@Test
+ public void testTransferLeader() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(3)) {
+ cluster.start();
+
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ Thread.sleep(1000);
+
+ List<RaftServer.Division> followers = cluster.getFollowers();
+ Assert.assertEquals(followers.size(), 2);
+ RaftServer.Division newLeader = followers.get(0);
+
+ List<RaftPeer> peers = cluster.getPeers();
+ List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
+ RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+ Assert.assertTrue(reply.isSuccess());
+
+ reply = client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId());
+ assertTrue(reply.isSuccess());
+ Thread.sleep(1000);
+
+ final RaftServer.Division currLeader = waitForLeader(cluster);
+ assertTrue(newLeader.getId() == currLeader.getId());
+
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ Assert.assertNotEquals(reply.getReplierId(), leader.getId());
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ cluster.shutdown();
+ }
+ }
+
+ @Test
public void testEnforceLeader() throws Exception {
LOG.info("Running testEnforceLeader");
final int numServer = 5;