You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/25 11:08:01 UTC
[incubator-ratis] branch master updated: RATIS-1261. Add timeout in
TransferLeadershipRequest (#373)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c219b98 RATIS-1261. Add timeout in TransferLeadershipRequest (#373)
c219b98 is described below
commit c219b9841eaa239c965faa850c2cf9685f8d42b9
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Dec 25 19:07:52 2020 +0800
RATIS-1261. Add timeout in TransferLeadershipRequest (#373)
* RATIS-1261. Add timeout in TransferLeadershipRequest
---
.../java/org/apache/ratis/client/RaftClient.java | 2 +-
.../org/apache/ratis/client/impl/BlockingImpl.java | 5 +-
.../apache/ratis/client/impl/ClientProtoUtils.java | 21 ++++---
.../apache/ratis/client/impl/RaftClientImpl.java | 5 +-
.../apache/ratis/protocol/RaftClientRequest.java | 24 +++++++-
.../ratis/protocol/TransferLeadershipRequest.java | 4 +-
.../exceptions/LeaderSteppingDownException.java | 12 ++--
.../exceptions/TransferLeadershipException.java | 12 ++--
.../java/org/apache/ratis/util/TimeDuration.java | 5 ++
.../grpc/client/GrpcClientProtocolClient.java | 5 +-
.../java/org/apache/ratis/netty/NettyRpcProxy.java | 4 +-
ratis-proto/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/impl/LeaderStateImpl.java | 8 +--
.../apache/ratis/server/impl/RaftServerImpl.java | 4 +-
.../ratis/server/impl/TransferLeadership.java | 10 +--
.../ratis/server/impl/LeaderElectionTests.java | 71 ++++++++++++++++++++--
16 files changed, 144 insertions(+), 49 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 0adf35b..78e0b77 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -69,7 +69,7 @@ public interface RaftClient extends Closeable {
RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
/** Transfer leadership to the given server.*/
- RaftClientReply transferLeadership(RaftGroupId group, RaftPeerId newLeader) throws IOException;
+ RaftClientReply transferLeadership(RaftGroupId group, RaftPeerId newLeader, long timeoutMs) throws IOException;
/** @return a {@link Builder}. */
static Builder newBuilder() {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 7d03e62..ee15c5a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -32,7 +32,9 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.TimeDuration;
@@ -96,7 +98,8 @@ class BlockingImpl implements BlockingApi {
if (reply != null) {
return reply;
}
- } catch (GroupMismatchException | StateMachineException e) {
+ } catch (GroupMismatchException | StateMachineException | TransferLeadershipException |
+ LeaderSteppingDownException e) {
throw e;
} catch (IOException e) {
ioe = e;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 92459fd..d00c2b4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -63,12 +63,12 @@ public interface ClientProtoUtils {
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId) {
return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(),
- replyId.toByteString(), requestorId.getGroupId(), null, null, null);
+ replyId.toByteString(), requestorId.getGroupId(), null, null, null, 0);
}
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long callId,
- SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable) {
+ SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
if (slidingWindowEntry == null) {
slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
}
@@ -78,7 +78,8 @@ public interface ClientProtoUtils {
.setReplyId(replyId)
.setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
.setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
- .setSlidingWindowEntry(slidingWindowEntry);
+ .setSlidingWindowEntry(slidingWindowEntry)
+ .setTimeoutMs(timeoutMs);
if (routingTable != null) {
b.setRoutingTable(routingTable.toProto());
@@ -89,9 +90,10 @@ public interface ClientProtoUtils {
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId,
- SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable) {
+ SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
return toRaftRpcRequestProtoBuilder(
- requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry, routingTable);
+ requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry, routingTable,
+ timeoutMs);
}
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
@@ -102,7 +104,8 @@ public interface ClientProtoUtils {
request.getRaftGroupId(),
request.getCallId(),
request.getSlidingWindowEntry(),
- request.getRoutingTable());
+ request.getRoutingTable(),
+ request.getTimeoutMs());
}
static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto p) {
@@ -156,6 +159,7 @@ public interface ClientProtoUtils {
.setType(type)
.setSlidingWindowEntry(request.getSlidingWindowEntry())
.setRoutingTable(getRoutingTable(request))
+ .setTimeoutMs(request.getTimeoutMs())
.build();
}
@@ -206,7 +210,7 @@ public interface ClientProtoUtils {
long seqNum, ByteString content) {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(
- clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false), null))
+ clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
.setWrite(WriteRequestTypeProto.getDefaultInstance())
.setMessage(toClientMessageEntryProtoBuilder(content))
.build();
@@ -441,7 +445,8 @@ public interface ClientProtoUtils {
RaftPeerId.valueOf(m.getReplyId()),
ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
p.getRpcRequest().getCallId(),
- newLeader.getId());
+ newLeader.getId(),
+ m.getTimeoutMs());
}
static TransferLeadershipRequestProto toTransferLeadershipRequestProto(
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index b318a28..25a1586 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -221,11 +221,12 @@ public final class RaftClientImpl implements RaftClient {
}
@Override
- public RaftClientReply transferLeadership(RaftGroupId raftGroupId, RaftPeerId newLeader) throws IOException {
+ public RaftClientReply transferLeadership(RaftGroupId raftGroupId, RaftPeerId newLeader, long timeoutMs)
+ throws IOException {
Objects.requireNonNull(newLeader, "newLeader == null");
final long callId = CallId.getAndIncrement();
return io().sendRequestWithRetry(() -> new TransferLeadershipRequest(
- clientId, leaderId, groupId, callId, newLeader));
+ clientId, leaderId, groupId, callId, newLeader, timeoutMs));
}
// TODO: change peersInNewConf to List<RaftPeer>
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 4aca94f..6f7a28b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -237,10 +237,11 @@ public class RaftClientRequest extends RaftClientMessage {
private Type type;
private SlidingWindowEntry slidingWindowEntry;
private RoutingTable routingTable;
+ private long timeoutMs;
public RaftClientRequest build() {
return new RaftClientRequest(
- clientId, serverId, groupId, callId, message, type, slidingWindowEntry, routingTable);
+ clientId, serverId, groupId, callId, message, type, slidingWindowEntry, routingTable, timeoutMs);
}
public Builder setClientId(ClientId clientId) {
@@ -282,6 +283,11 @@ public class RaftClientRequest extends RaftClientMessage {
this.routingTable = routingTable;
return this;
}
+
+ public Builder setTimeoutMs(long timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ return this;
+ }
}
public static Builder newBuilder() {
@@ -308,20 +314,28 @@ public class RaftClientRequest extends RaftClientMessage {
private final RoutingTable routingTable;
+ private final long timeoutMs;
+
protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
- this(clientId, serverId, groupId, callId, null, type, null, null);
+ this(clientId, serverId, groupId, callId, null, type, null, null, 0);
+ }
+
+ protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type,
+ long timeoutMs) {
+ this(clientId, serverId, groupId, callId, null, type, null, null, timeoutMs);
}
@SuppressWarnings("parameternumber")
private RaftClientRequest(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
- RoutingTable routingTable) {
+ RoutingTable routingTable, long timeoutMs) {
super(clientId, serverId, groupId, callId);
this.message = message;
this.type = type;
this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
this.routingTable = routingTable;
+ this.timeoutMs = timeoutMs;
}
@Override
@@ -349,6 +363,10 @@ public class RaftClientRequest extends RaftClientMessage {
return routingTable;
}
+ public long getTimeoutMs() {
+ return timeoutMs;
+ }
+
@Override
public String toString() {
return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
index 979ba7a..c80e489 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
@@ -21,8 +21,8 @@ public class TransferLeadershipRequest extends RaftClientRequest {
private final RaftPeerId newLeader;
public TransferLeadershipRequest(
- ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, RaftPeerId newLeader) {
- super(clientId, serverId, groupId, callId, readRequestType());
+ ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, RaftPeerId newLeader, long timeoutMs) {
+ super(clientId, serverId, groupId, callId, readRequestType(), timeoutMs);
this.newLeader = newLeader;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
index bc81893..a1d7ea9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
@@ -17,17 +17,13 @@
*/
package org.apache.ratis.protocol.exceptions;
-import org.apache.ratis.protocol.RaftGroupMemberId;
-
public class LeaderSteppingDownException extends RaftException {
- private final RaftGroupMemberId serverId;
- public LeaderSteppingDownException(RaftGroupMemberId id) {
- super(id + " is in steppingDown");
- this.serverId = id;
+ public LeaderSteppingDownException(String message) {
+ super(message);
}
- public RaftGroupMemberId getServerId() {
- return serverId;
+ public LeaderSteppingDownException(String message, Throwable t) {
+ super(message, t);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
index 2a503aa..f859a28 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
@@ -17,17 +17,13 @@
*/
package org.apache.ratis.protocol.exceptions;
-import org.apache.ratis.protocol.RaftGroupMemberId;
-
public class TransferLeadershipException extends RaftException {
- private final RaftGroupMemberId serverId;
- public TransferLeadershipException(RaftGroupMemberId id, String msg) {
- super(msg);
- this.serverId = id;
+ public TransferLeadershipException(String message) {
+ super(message);
}
- public RaftGroupMemberId getServerId() {
- return serverId;
+ public TransferLeadershipException(String message, Throwable t) {
+ super(message, t);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 54d0dab..a7d2e41 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -205,6 +205,11 @@ public final class TimeDuration implements Comparable<TimeDuration> {
return valueOf(this.toLong(minUnit) + that.toLong(minUnit), minUnit);
}
+ /** @return (this + (thatDuration, thatUnit)) in the minimum unit among them. */
+ public TimeDuration add(long thatDuration, TimeUnit thatUnit) {
+ return add(TimeDuration.valueOf(thatDuration, thatUnit));
+ }
+
/** @return (this - that) in the minimum unit among them. */
public TimeDuration subtract(TimeDuration that) {
Objects.requireNonNull(that, "that == null");
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index e29baa4..1d366f9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -68,6 +68,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -181,8 +182,10 @@ public class GrpcClientProtocolClient implements Closeable {
RaftClientReplyProto transferLeadership(
TransferLeadershipRequestProto request) throws IOException {
+ TimeDuration newDuration = requestTimeoutDuration.add(
+ request.getRpcRequest().getTimeoutMs(), TimeUnit.MILLISECONDS);
return blockingCall(() -> blockingStub
- .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+ .withDeadlineAfter(newDuration.getDuration(), newDuration.getUnit())
.transferLeadership(request));
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 52f9915..65a8052 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -42,6 +42,7 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
@@ -182,7 +183,8 @@ public class NettyRpcProxy implements Closeable {
try {
channelFuture.sync();
- return reply.get(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit());
+ TimeDuration newDuration = requestTimeoutDuration.add(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
+ return reply.get(newDuration.getDuration(), newDuration.getUnit());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException(ProtoUtils.toString(request)
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 468c458..1d5e57e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -111,6 +111,7 @@ message RaftRpcRequestProto {
RaftGroupIdProto raftGroupId = 3;
uint64 callId = 4;
+ uint64 timeoutMs = 13;
RoutingTableProto routingTable = 14;
SlidingWindowEntry slidingWindowEntry = 15;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 8b6d391..d54e337 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -872,9 +872,9 @@ class LeaderStateImpl implements LeaderState {
final TermIndex leaderLastEntry = server.getState().getLastEntry();
if (leaderLastEntry == null) {
- LOG.info("{} stepDown leadership on term:{} because follower's priority:{} is higher than leader's:{} " +
+ LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
"and leader's lastEntry is null",
- this, currentTerm, followerPriority, leaderPriority);
+ this, currentTerm, followerID, followerPriority, leaderPriority);
// step down as follower
yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
@@ -882,9 +882,9 @@ class LeaderStateImpl implements LeaderState {
}
if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
- LOG.info("{} stepDown leadership on term:{} because follower's priority:{} is higher than leader's:{} " +
+ LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
"and follower's lastEntry index:{} catch up with leader's:{}",
- this, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
+ this, currentTerm, followerID, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
leaderLastEntry.getIndex());
// step down as follower
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 22494b3..f24c3bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -619,7 +619,7 @@ class RaftServerImpl implements RaftServer.Division,
}
if (isWrite && isSteppingDown()) {
- final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId());
+ final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down");
final RaftClientReply reply = newExceptionReply(request, lsde);
return RetryCacheImpl.failWithReply(reply, entry);
}
@@ -888,7 +888,7 @@ class RaftServerImpl implements RaftServer.Division,
TransferLeadershipRequest request, String msg) {
LOG.warn(msg);
return CompletableFuture.completedFuture(
- newExceptionReply(request, new TransferLeadershipException(getMemberId(), msg)));
+ newExceptionReply(request, new TransferLeadershipException(msg)));
}
boolean isSteppingDown() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index b6db5e3..03b1dfc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class TransferLeadership {
@@ -59,8 +60,9 @@ public class TransferLeadership {
if (currentLeader != null && currentLeader.equals(request.getNewLeader())) {
replyFuture.complete(server.newSuccessReply(request));
} else if (timeout) {
- final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
- "Failed to transfer leadership to " + request.getNewLeader() + ": current leader is " + currentLeader);
+ final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId()
+ + ": Failed to transfer leadership to " + request.getNewLeader()
+ + " (timed out " + request.getTimeoutMs() + "ms): current leader is " + currentLeader);
replyFuture.complete(server.newExceptionReply(request, tle));
}
}
@@ -99,13 +101,13 @@ public class TransferLeadership {
});
return replyFuture;
} else {
- final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
+ final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId() +
"Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
}
}
- scheduler.onTimeout(TimeDuration.ONE_MINUTE,
+ scheduler.onTimeout(TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS),
() -> finish(server.getState().getLeaderId(), true),
LOG, () -> "Timeout check failed for append entry request: " + request);
return supplier.get().getReplyFuture();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 3cd843a..4a45bbe 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -27,6 +27,8 @@ import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -46,6 +48,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -126,7 +129,6 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
final RaftServer.Division leader = waitForLeader(cluster);
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
- Thread.sleep(1000);
List<RaftServer.Division> followers = cluster.getFollowers();
Assert.assertEquals(followers.size(), 2);
@@ -137,15 +139,14 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
Assert.assertTrue(reply.isSuccess());
- reply = client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId());
+ reply = client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId(), 20000);
assertTrue(reply.isSuccess());
- Thread.sleep(1000);
final RaftServer.Division currLeader = waitForLeader(cluster);
assertTrue(newLeader.getId() == currLeader.getId());
reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
- Assert.assertNotEquals(reply.getReplierId(), leader.getId());
+ Assert.assertTrue(reply.getReplierId().equals(newLeader.getId().toString()));
Assert.assertTrue(reply.isSuccess());
}
@@ -154,6 +155,68 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
}
@Test
+ public void testTransferLeaderTimeout() throws Exception {
+ try(final MiniRaftCluster cluster = newCluster(3)) {
+ cluster.start();
+
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ List<RaftServer.Division> followers = cluster.getFollowers();
+ Assert.assertEquals(followers.size(), 2);
+ RaftServer.Division newLeader = followers.get(0);
+
+ // isolate new leader, so that transfer leadership will timeout
+ isolate(cluster, newLeader.getId());
+
+ List<RaftPeer> peers = cluster.getPeers();
+ List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
+ RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+ Assert.assertTrue(reply.isSuccess());
+
+ CompletableFuture<Boolean> transferTimeoutFuture = CompletableFuture.supplyAsync(() -> {
+ try {
+ long timeoutMs = 5000;
+ long start = System.currentTimeMillis();
+ try {
+ client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId(), timeoutMs);
+ } catch (TransferLeadershipException e) {
+ long cost = System.currentTimeMillis() - start;
+ Assert.assertTrue(cost > timeoutMs);
+ Assert.assertTrue(e.getMessage().contains("Failed to transfer leadership to"));
+ Assert.assertTrue(e.getMessage().contains("timed out"));
+ }
+
+ return true;
+ } catch (IOException e) {
+ return false;
+ }
+ });
+
+ // before transfer timeout, leader should in steppingDown
+ JavaUtils.attemptRepeatedly(() -> {
+ try {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ } catch (LeaderSteppingDownException e) {
+ Assert.assertTrue(e.getMessage().contains("is stepping down"));
+ }
+ return null;
+ }, 5, TimeDuration.ONE_SECOND, "check leader steppingDown", RaftServer.LOG);
+
+ Assert.assertTrue(transferTimeoutFuture.get());
+
+ // after transfer timeout, leader should accept request
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString()));
+ Assert.assertTrue(reply.isSuccess());
+
+ deIsolate(cluster, newLeader.getId());
+ }
+
+ cluster.shutdown();
+ }
+ }
+
+ @Test
public void testEnforceLeader() throws Exception {
LOG.info("Running testEnforceLeader");
final int numServer = 5;