You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2021/09/09 07:25:02 UTC
[ratis] branch master updated: RATIS-1392. Cache leader information
in RaftClientImpl. (#490)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new da4fd87 RATIS-1392. Cache leader information in RaftClientImpl. (#490)
da4fd87 is described below
commit da4fd87a17a6364f9420902a533b7cdfba40958c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Sep 9 15:24:55 2021 +0800
RATIS-1392. Cache leader information in RaftClientImpl. (#490)
---
.../org/apache/ratis/client/impl/BlockingImpl.java | 2 +-
.../apache/ratis/client/impl/ClientProtoUtils.java | 26 ++++++++++++------
.../org/apache/ratis/client/impl/OrderedAsync.java | 1 +
.../apache/ratis/client/impl/RaftClientImpl.java | 31 +++++++++++++++++++---
.../apache/ratis/client/impl/UnorderedAsync.java | 1 +
.../apache/ratis/protocol/GroupInfoRequest.java | 2 +-
.../apache/ratis/protocol/GroupListRequest.java | 2 +-
.../ratis/protocol/GroupManagementRequest.java | 2 +-
.../apache/ratis/protocol/RaftClientRequest.java | 27 +++++++++++++++----
.../ratis/protocol/SetConfigurationRequest.java | 2 +-
ratis-proto/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/impl/MiniRaftCluster.java | 2 +-
.../server/impl/TestRatisServerMetricsBase.java | 2 +-
13 files changed, 78 insertions(+), 23 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index ee15c5a..8decf96 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -96,7 +96,7 @@ class BlockingImpl implements BlockingApi {
final RaftClientReply reply = sendRequest(request);
if (reply != null) {
- return reply;
+ return client.handleReply(request, reply);
}
} catch (GroupMismatchException | StateMachineException | TransferLeadershipException |
LeaderSteppingDownException e) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index bb706e7..0379353 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -64,11 +64,12 @@ public interface ClientProtoUtils {
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId) {
return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(),
- replyId.toByteString(), requestorId.getGroupId(), null, null, null, 0);
+ replyId.toByteString(), requestorId.getGroupId(), null, false, null, null, 0);
}
+ @SuppressWarnings("parameternumber")
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long callId,
+ ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long callId, boolean toLeader,
SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
if (slidingWindowEntry == null) {
slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
@@ -79,6 +80,7 @@ public interface ClientProtoUtils {
.setReplyId(replyId)
.setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
.setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
+ .setToLeader(toLeader)
.setSlidingWindowEntry(slidingWindowEntry)
.setTimeoutMs(timeoutMs);
@@ -89,11 +91,12 @@ public interface ClientProtoUtils {
return b;
}
+ @SuppressWarnings("parameternumber")
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId,
+ ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId, boolean toLeader,
SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
return toRaftRpcRequestProtoBuilder(
- requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry, routingTable,
+ requesterId.toByteString(), replyId.toByteString(), groupId, callId, toLeader, slidingWindowEntry, routingTable,
timeoutMs);
}
@@ -104,6 +107,7 @@ public interface ClientProtoUtils {
request.getServerId(),
request.getRaftGroupId(),
request.getCallId(),
+ request.isToLeader(),
request.getSlidingWindowEntry(),
request.getRoutingTable(),
request.getTimeoutMs());
@@ -151,9 +155,15 @@ public interface ClientProtoUtils {
final RaftClientRequest.Type type = toRaftClientRequestType(p);
final RaftRpcRequestProto request = p.getRpcRequest();
- return RaftClientRequest.newBuilder()
- .setClientId(ClientId.valueOf(request.getRequestorId()))
- .setServerId(RaftPeerId.valueOf(request.getReplyId()))
+ final RaftClientRequest.Builder b = RaftClientRequest.newBuilder();
+
+ final RaftPeerId perrId = RaftPeerId.valueOf(request.getReplyId());
+ if (request.getToLeader()) {
+ b.setLeaderId(perrId);
+ } else {
+ b.setServerId(perrId);
+ }
+ return b.setClientId(ClientId.valueOf(request.getRequestorId()))
.setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId()))
.setCallId(request.getCallId())
.setMessage(toMessage(p.getMessage()))
@@ -211,7 +221,7 @@ public interface ClientProtoUtils {
long seqNum, ByteString content) {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(
- clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
+ clientId, serverId, groupId, callId, false, ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
.setWrite(WriteRequestTypeProto.getDefaultInstance())
.setMessage(toClientMessageEntryProtoBuilder(content))
.build();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 043181d..2c2a0e2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -193,6 +193,7 @@ public final class OrderedAsync {
if (reply == null) {
scheduleWithTimeout(pending, request, retryPolicy, null);
} else {
+ client.handleReply(request, reply);
f.complete(reply);
}
}).exceptionally(e -> {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9c3aac8..ce7150b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -38,6 +38,8 @@ import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
+import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
@@ -65,6 +67,11 @@ import java.util.function.Supplier;
/** A client who sends requests to a raft service. */
public final class RaftClientImpl implements RaftClient {
+ private static final Cache<RaftGroupId, RaftPeerId> LEADER_CACHE = CacheBuilder.newBuilder()
+ .expireAfterAccess(60, TimeUnit.SECONDS)
+ .maximumSize(1024)
+ .build();
+
public abstract static class PendingClientRequest {
private final long creationTimeInMs = System.currentTimeMillis();
private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
@@ -139,6 +146,13 @@ public final class RaftClientImpl implements RaftClient {
this.clientId = clientId;
this.peers.set(group.getPeers());
this.groupId = group.getGroupId();
+
+ if (leaderId == null) {
+ final RaftPeerId cached = LEADER_CACHE.getIfPresent(groupId);
+ if (cached != null && group.getPeer(cached) != null) {
+ leaderId = cached;
+ }
+ }
this.leaderId = leaderId != null? leaderId : getHighestPriorityPeerId();
this.retryPolicy = Objects.requireNonNull(retryPolicy, "retry policy can't be null");
@@ -204,9 +218,13 @@ public final class RaftClientImpl implements RaftClient {
RaftClientRequest newRaftClientRequest(
RaftPeerId server, long callId, Message message, RaftClientRequest.Type type,
SlidingWindowEntry slidingWindowEntry) {
- return RaftClientRequest.newBuilder()
- .setClientId(clientId)
- .setServerId(server != null? server: leaderId)
+ final RaftClientRequest.Builder b = RaftClientRequest.newBuilder();
+ if (server != null) {
+ b.setServerId(server);
+ } else {
+ b.setLeaderId(leaderId);
+ }
+ return b.setClientId(clientId)
.setGroupId(groupId)
.setCallId(callId)
.setMessage(message)
@@ -254,6 +272,13 @@ public final class RaftClientImpl implements RaftClient {
return new RaftRetryFailureException(event.getRequest(), attemptCount, retryPolicy, throwable);
}
+ RaftClientReply handleReply(RaftClientRequest request, RaftClientReply reply) {
+ if (request.isToLeader() && reply != null && reply.getException() == null) {
+ LEADER_CACHE.put(reply.getRaftGroupId(), reply.getServerId());
+ }
+ return reply;
+ }
+
static <E extends Throwable> RaftClientReply handleRaftException(
RaftClientReply reply, Function<RaftException, E> converter) throws E {
if (reply != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 43ed7a9..432acce 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -80,6 +80,7 @@ public interface UnorderedAsync {
final RaftException replyException = reply != null? reply.getException(): null;
reply = client.handleLeaderException(request, reply);
if (reply != null) {
+ client.handleReply(request, reply);
f.complete(reply);
return;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
index 567c2be..a62495e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
@@ -23,6 +23,6 @@ package org.apache.ratis.protocol;
*/
public class GroupInfoRequest extends RaftClientRequest {
public GroupInfoRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
- super(clientId, serverId, groupId, callId, readRequestType());
+ super(clientId, serverId, groupId, callId, false, readRequestType());
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
index af38b6a..e28e7b1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
@@ -23,6 +23,6 @@ package org.apache.ratis.protocol;
*/
public class GroupListRequest extends RaftClientRequest {
public GroupListRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
- super(clientId, serverId, groupId, callId, readRequestType());
+ super(clientId, serverId, groupId, callId, false, readRequestType());
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index 4385ba2..d370dfc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -92,7 +92,7 @@ public final class GroupManagementRequest extends RaftClientRequest {
private final Op op;
private GroupManagementRequest(ClientId clientId, RaftPeerId serverId, long callId, Op op) {
- super(clientId, serverId, op.getGroupId(), callId, writeRequestType());
+ super(clientId, serverId, op.getGroupId(), callId, false, writeRequestType());
this.op = op;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 6f7a28b..8fef42d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -232,6 +232,7 @@ public class RaftClientRequest extends RaftClientMessage {
private RaftPeerId serverId;
private RaftGroupId groupId;
private long callId;
+ private boolean toLeader;
private Message message;
private Type type;
@@ -241,7 +242,7 @@ public class RaftClientRequest extends RaftClientMessage {
public RaftClientRequest build() {
return new RaftClientRequest(
- clientId, serverId, groupId, callId, message, type, slidingWindowEntry, routingTable, timeoutMs);
+ clientId, serverId, groupId, callId, toLeader, message, type, slidingWindowEntry, routingTable, timeoutMs);
}
public Builder setClientId(ClientId clientId) {
@@ -249,8 +250,15 @@ public class RaftClientRequest extends RaftClientMessage {
return this;
}
+ public Builder setLeaderId(RaftPeerId leaderId) {
+ this.serverId = leaderId;
+ this.toLeader = true;
+ return this;
+ }
+
public Builder setServerId(RaftPeerId serverId) {
this.serverId = serverId;
+ this.toLeader = false;
return this;
}
@@ -316,21 +324,26 @@ public class RaftClientRequest extends RaftClientMessage {
private final long timeoutMs;
- protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
- this(clientId, serverId, groupId, callId, null, type, null, null, 0);
+ private final boolean toLeader;
+
+ protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
+ boolean toLeader, Type type) {
+ this(clientId, serverId, groupId, callId, toLeader, null, type, null, null, 0);
}
protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type,
long timeoutMs) {
- this(clientId, serverId, groupId, callId, null, type, null, null, timeoutMs);
+ this(clientId, serverId, groupId, callId, true, null, type, null, null, timeoutMs);
}
@SuppressWarnings("parameternumber")
private RaftClientRequest(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
- long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
+ long callId, boolean toLeader, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
RoutingTable routingTable, long timeoutMs) {
super(clientId, serverId, groupId, callId);
+ this.toLeader = toLeader;
+
this.message = message;
this.type = type;
this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
@@ -343,6 +356,10 @@ public class RaftClientRequest extends RaftClientMessage {
return true;
}
+ public boolean isToLeader() {
+ return toLeader;
+ }
+
public SlidingWindowEntry getSlidingWindowEntry() {
return slidingWindowEntry;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index ae8219b..5a8fc21 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -27,7 +27,7 @@ public class SetConfigurationRequest extends RaftClientRequest {
public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId, List<RaftPeer> peers) {
- super(clientId, serverId, groupId, callId, writeRequestType());
+ super(clientId, serverId, groupId, callId, true, writeRequestType());
this.peers = peers != null? Collections.unmodifiableList(peers): Collections.emptyList();
Preconditions.assertUnique(this.peers);
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index c98f0d7..092e18c 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -112,6 +112,7 @@ message RaftRpcRequestProto {
bytes replyId = 2;
RaftGroupIdProto raftGroupId = 3;
uint64 callId = 4;
+ bool toLeader = 5;
uint64 timeoutMs = 13;
RoutingTableProto routingTable = 14;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 955bb42..5247d23 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -728,7 +728,7 @@ public abstract class MiniRaftCluster implements Closeable {
ClientId clientId, RaftPeerId leaderId, long callId, Message message) {
return RaftClientRequest.newBuilder()
.setClientId(clientId)
- .setServerId(leaderId)
+ .setLeaderId(leaderId)
.setGroupId(getGroupId())
.setCallId(callId)
.setMessage(message)
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
index 838db86..3104f08 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
@@ -60,7 +60,7 @@ public abstract class TestRatisServerMetricsBase<CLUSTER extends MiniRaftCluster
// StaleRead with Long.MAX_VALUE minIndex will fail.
RaftClientRequest r = RaftClientRequest.newBuilder()
.setClientId(clientId)
- .setServerId(leaderImpl.getId())
+ .setLeaderId(leaderImpl.getId())
.setGroupId(cluster.getGroupId())
.setCallId(0)
.setMessage(Message.EMPTY)