You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2021/09/09 07:25:02 UTC

[ratis] branch master updated: RATIS-1392. Cache leader information in RaftClientImpl. (#490)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new da4fd87  RATIS-1392. Cache leader information in RaftClientImpl. (#490)
da4fd87 is described below

commit da4fd87a17a6364f9420902a533b7cdfba40958c
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Sep 9 15:24:55 2021 +0800

    RATIS-1392. Cache leader information in RaftClientImpl. (#490)
---
 .../org/apache/ratis/client/impl/BlockingImpl.java |  2 +-
 .../apache/ratis/client/impl/ClientProtoUtils.java | 26 ++++++++++++------
 .../org/apache/ratis/client/impl/OrderedAsync.java |  1 +
 .../apache/ratis/client/impl/RaftClientImpl.java   | 31 +++++++++++++++++++---
 .../apache/ratis/client/impl/UnorderedAsync.java   |  1 +
 .../apache/ratis/protocol/GroupInfoRequest.java    |  2 +-
 .../apache/ratis/protocol/GroupListRequest.java    |  2 +-
 .../ratis/protocol/GroupManagementRequest.java     |  2 +-
 .../apache/ratis/protocol/RaftClientRequest.java   | 27 +++++++++++++++----
 .../ratis/protocol/SetConfigurationRequest.java    |  2 +-
 ratis-proto/src/main/proto/Raft.proto              |  1 +
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  2 +-
 .../server/impl/TestRatisServerMetricsBase.java    |  2 +-
 13 files changed, 78 insertions(+), 23 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index ee15c5a..8decf96 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -96,7 +96,7 @@ class BlockingImpl implements BlockingApi {
         final RaftClientReply reply = sendRequest(request);
 
         if (reply != null) {
-          return reply;
+          return client.handleReply(request, reply);
         }
       } catch (GroupMismatchException | StateMachineException | TransferLeadershipException |
           LeaderSteppingDownException e) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index bb706e7..0379353 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -64,11 +64,12 @@ public interface ClientProtoUtils {
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId) {
     return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(),
-        replyId.toByteString(), requestorId.getGroupId(), null, null, null, 0);
+        replyId.toByteString(), requestorId.getGroupId(), null, false, null, null, 0);
   }
 
+  @SuppressWarnings("parameternumber")
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long callId,
+      ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long callId, boolean toLeader,
       SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
     if (slidingWindowEntry == null) {
       slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
@@ -79,6 +80,7 @@ public interface ClientProtoUtils {
         .setReplyId(replyId)
         .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
         .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
+        .setToLeader(toLeader)
         .setSlidingWindowEntry(slidingWindowEntry)
         .setTimeoutMs(timeoutMs);
 
@@ -89,11 +91,12 @@ public interface ClientProtoUtils {
     return b;
   }
 
+  @SuppressWarnings("parameternumber")
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId,
+      ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId, boolean toLeader,
       SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
     return toRaftRpcRequestProtoBuilder(
-        requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry, routingTable,
+        requesterId.toByteString(), replyId.toByteString(), groupId, callId, toLeader, slidingWindowEntry, routingTable,
         timeoutMs);
   }
 
@@ -104,6 +107,7 @@ public interface ClientProtoUtils {
         request.getServerId(),
         request.getRaftGroupId(),
         request.getCallId(),
+        request.isToLeader(),
         request.getSlidingWindowEntry(),
         request.getRoutingTable(),
         request.getTimeoutMs());
@@ -151,9 +155,15 @@ public interface ClientProtoUtils {
     final RaftClientRequest.Type type = toRaftClientRequestType(p);
     final RaftRpcRequestProto request = p.getRpcRequest();
 
-    return RaftClientRequest.newBuilder()
-        .setClientId(ClientId.valueOf(request.getRequestorId()))
-        .setServerId(RaftPeerId.valueOf(request.getReplyId()))
+    final RaftClientRequest.Builder b = RaftClientRequest.newBuilder();
+
+    final RaftPeerId perrId = RaftPeerId.valueOf(request.getReplyId());
+    if (request.getToLeader()) {
+      b.setLeaderId(perrId);
+    } else {
+      b.setServerId(perrId);
+    }
+    return b.setClientId(ClientId.valueOf(request.getRequestorId()))
         .setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId()))
         .setCallId(request.getCallId())
         .setMessage(toMessage(p.getMessage()))
@@ -211,7 +221,7 @@ public interface ClientProtoUtils {
       long seqNum, ByteString content) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
-            clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
+            clientId, serverId, groupId, callId, false, ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
         .setWrite(WriteRequestTypeProto.getDefaultInstance())
         .setMessage(toClientMessageEntryProtoBuilder(content))
         .build();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 043181d..2c2a0e2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -193,6 +193,7 @@ public final class OrderedAsync {
       if (reply == null) {
         scheduleWithTimeout(pending, request, retryPolicy, null);
       } else {
+        client.handleReply(request, reply);
         f.complete(reply);
       }
     }).exceptionally(e -> {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9c3aac8..ce7150b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -38,6 +38,8 @@ import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
+import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
@@ -65,6 +67,11 @@ import java.util.function.Supplier;
 
 /** A client who sends requests to a raft service. */
 public final class RaftClientImpl implements RaftClient {
+  private static final Cache<RaftGroupId, RaftPeerId> LEADER_CACHE = CacheBuilder.newBuilder()
+      .expireAfterAccess(60, TimeUnit.SECONDS)
+      .maximumSize(1024)
+      .build();
+
   public abstract static class PendingClientRequest {
     private final long creationTimeInMs = System.currentTimeMillis();
     private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
@@ -139,6 +146,13 @@ public final class RaftClientImpl implements RaftClient {
     this.clientId = clientId;
     this.peers.set(group.getPeers());
     this.groupId = group.getGroupId();
+
+    if (leaderId == null) {
+      final RaftPeerId cached = LEADER_CACHE.getIfPresent(groupId);
+      if (cached != null && group.getPeer(cached) != null) {
+        leaderId = cached;
+      }
+    }
     this.leaderId = leaderId != null? leaderId : getHighestPriorityPeerId();
     this.retryPolicy = Objects.requireNonNull(retryPolicy, "retry policy can't be null");
 
@@ -204,9 +218,13 @@ public final class RaftClientImpl implements RaftClient {
   RaftClientRequest newRaftClientRequest(
       RaftPeerId server, long callId, Message message, RaftClientRequest.Type type,
       SlidingWindowEntry slidingWindowEntry) {
-    return RaftClientRequest.newBuilder()
-        .setClientId(clientId)
-        .setServerId(server != null? server: leaderId)
+    final RaftClientRequest.Builder b = RaftClientRequest.newBuilder();
+    if (server != null) {
+      b.setServerId(server);
+    } else {
+      b.setLeaderId(leaderId);
+    }
+    return b.setClientId(clientId)
         .setGroupId(groupId)
         .setCallId(callId)
         .setMessage(message)
@@ -254,6 +272,13 @@ public final class RaftClientImpl implements RaftClient {
     return new RaftRetryFailureException(event.getRequest(), attemptCount, retryPolicy, throwable);
   }
 
+  RaftClientReply handleReply(RaftClientRequest request, RaftClientReply reply) {
+    if (request.isToLeader() && reply != null && reply.getException() == null) {
+      LEADER_CACHE.put(reply.getRaftGroupId(), reply.getServerId());
+    }
+    return reply;
+  }
+
   static <E extends Throwable> RaftClientReply handleRaftException(
       RaftClientReply reply, Function<RaftException, E> converter) throws E {
     if (reply != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 43ed7a9..432acce 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -80,6 +80,7 @@ public interface UnorderedAsync {
         final RaftException replyException = reply != null? reply.getException(): null;
         reply = client.handleLeaderException(request, reply);
         if (reply != null) {
+          client.handleReply(request, reply);
           f.complete(reply);
           return;
         }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
index 567c2be..a62495e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
@@ -23,6 +23,6 @@ package org.apache.ratis.protocol;
  */
 public class GroupInfoRequest extends RaftClientRequest {
   public GroupInfoRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
-    super(clientId, serverId, groupId, callId, readRequestType());
+    super(clientId, serverId, groupId, callId, false, readRequestType());
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
index af38b6a..e28e7b1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
@@ -23,6 +23,6 @@ package org.apache.ratis.protocol;
  */
 public class GroupListRequest extends RaftClientRequest {
   public GroupListRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
-    super(clientId, serverId, groupId, callId, readRequestType());
+    super(clientId, serverId, groupId, callId, false, readRequestType());
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index 4385ba2..d370dfc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -92,7 +92,7 @@ public final class GroupManagementRequest extends RaftClientRequest {
   private final Op op;
 
   private GroupManagementRequest(ClientId clientId, RaftPeerId serverId, long callId, Op op) {
-    super(clientId, serverId, op.getGroupId(), callId, writeRequestType());
+    super(clientId, serverId, op.getGroupId(), callId, false, writeRequestType());
     this.op = op;
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 6f7a28b..8fef42d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -232,6 +232,7 @@ public class RaftClientRequest extends RaftClientMessage {
     private RaftPeerId serverId;
     private RaftGroupId groupId;
     private long callId;
+    private boolean toLeader;
 
     private Message message;
     private Type type;
@@ -241,7 +242,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
     public RaftClientRequest build() {
       return new RaftClientRequest(
-          clientId, serverId, groupId, callId, message, type, slidingWindowEntry, routingTable, timeoutMs);
+          clientId, serverId, groupId, callId, toLeader, message, type, slidingWindowEntry, routingTable, timeoutMs);
     }
 
     public Builder setClientId(ClientId clientId) {
@@ -249,8 +250,15 @@ public class RaftClientRequest extends RaftClientMessage {
       return this;
     }
 
+    public Builder setLeaderId(RaftPeerId leaderId) {
+      this.serverId = leaderId;
+      this.toLeader = true;
+      return this;
+    }
+
     public Builder setServerId(RaftPeerId serverId) {
       this.serverId = serverId;
+      this.toLeader = false;
       return this;
     }
 
@@ -316,21 +324,26 @@ public class RaftClientRequest extends RaftClientMessage {
 
   private final long timeoutMs;
 
-  protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
-    this(clientId, serverId, groupId, callId, null, type, null, null, 0);
+  private final boolean toLeader;
+
+  protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
+      boolean toLeader, Type type) {
+    this(clientId, serverId, groupId, callId, toLeader, null, type, null, null, 0);
   }
 
   protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type,
       long timeoutMs) {
-    this(clientId, serverId, groupId, callId, null, type, null, null, timeoutMs);
+    this(clientId, serverId, groupId, callId, true, null, type, null, null, timeoutMs);
   }
 
   @SuppressWarnings("parameternumber")
   private RaftClientRequest(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
-      long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
+      long callId, boolean toLeader, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
       RoutingTable routingTable, long timeoutMs) {
     super(clientId, serverId, groupId, callId);
+    this.toLeader = toLeader;
+
     this.message = message;
     this.type = type;
     this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
@@ -343,6 +356,10 @@ public class RaftClientRequest extends RaftClientMessage {
     return true;
   }
 
+  public boolean isToLeader() {
+    return toLeader;
+  }
+
   public SlidingWindowEntry getSlidingWindowEntry() {
     return slidingWindowEntry;
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index ae8219b..5a8fc21 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -27,7 +27,7 @@ public class SetConfigurationRequest extends RaftClientRequest {
 
   public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId, List<RaftPeer> peers) {
-    super(clientId, serverId, groupId, callId, writeRequestType());
+    super(clientId, serverId, groupId, callId, true, writeRequestType());
     this.peers = peers != null? Collections.unmodifiableList(peers): Collections.emptyList();
     Preconditions.assertUnique(this.peers);
   }
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index c98f0d7..092e18c 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -112,6 +112,7 @@ message RaftRpcRequestProto {
   bytes replyId = 2;
   RaftGroupIdProto raftGroupId = 3;
   uint64 callId = 4;
+  bool toLeader = 5;
 
   uint64 timeoutMs = 13;
   RoutingTableProto routingTable = 14;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 955bb42..5247d23 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -728,7 +728,7 @@ public abstract class MiniRaftCluster implements Closeable {
       ClientId clientId, RaftPeerId leaderId, long callId, Message message) {
     return RaftClientRequest.newBuilder()
         .setClientId(clientId)
-        .setServerId(leaderId)
+        .setLeaderId(leaderId)
         .setGroupId(getGroupId())
         .setCallId(callId)
         .setMessage(message)
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
index 838db86..3104f08 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
@@ -60,7 +60,7 @@ public abstract class TestRatisServerMetricsBase<CLUSTER extends MiniRaftCluster
     // StaleRead with Long.MAX_VALUE minIndex will fail.
     RaftClientRequest r = RaftClientRequest.newBuilder()
         .setClientId(clientId)
-        .setServerId(leaderImpl.getId())
+        .setLeaderId(leaderImpl.getId())
         .setGroupId(cluster.getGroupId())
         .setCallId(0)
         .setMessage(Message.EMPTY)