You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/12/20 03:57:04 UTC

[ratis] branch master updated: RATIS-1759. Support client use linearizable read per request (#798)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 684a19aa1 RATIS-1759. Support client use linearizable read per request (#798)
684a19aa1 is described below

commit 684a19aa1af4c467de2be068234170a6a2ba292f
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Tue Dec 20 11:56:47 2022 +0800

    RATIS-1759. Support client use linearizable read per request (#798)
    
    * Support client use linearizable read per request
    
    * add new interface: sendReadOnlyNonLinearizable
---
 .../src/main/java/org/apache/ratis/client/api/AsyncApi.java   | 11 +++++++++++
 .../main/java/org/apache/ratis/client/api/BlockingApi.java    | 11 +++++++++++
 .../src/main/java/org/apache/ratis/client/impl/AsyncImpl.java |  5 +++++
 .../main/java/org/apache/ratis/client/impl/BlockingImpl.java  |  5 +++++
 .../java/org/apache/ratis/protocol/RaftClientRequest.java     |  8 +++++++-
 ratis-proto/src/main/proto/Raft.proto                         |  1 +
 .../java/org/apache/ratis/server/impl/RaftServerImpl.java     |  6 ++++--
 7 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 84a4f5437..c6f5e4181 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -55,6 +55,17 @@ public interface AsyncApi {
    */
   CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeerId server);
 
+  /**
+   * Send the given readonly message asynchronously to the raft service using non-linearizable read.
+   * This method is useful when linearizable read is enabled
+   * but this client prefers not using it for performance reason.
+   * When linearizable read is disabled, this method is the same as {@link #sendReadOnly(Message)}.
+   *
+   * @param message The request message.
+   * @return a future of the reply.
+   */
+  CompletableFuture<RaftClientReply> sendReadOnlyNonLinearizable(Message message);
+
   /** The same as sendReadOnlyUnordered(message, null). */
   default CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message) {
     return sendReadOnlyUnordered(message, null);
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index 4a5237afc..dc03e1b8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -54,6 +54,17 @@ public interface BlockingApi {
    */
   RaftClientReply sendReadOnly(Message message, RaftPeerId server) throws IOException;
 
+  /**
+   * Send the given readonly message to the raft service using non-linearizable read.
+   * This method is useful when linearizable read is enabled
+   * but this client prefers not using it for performance reason.
+   * When linearizable read is disabled, this method is the same as {@link #sendReadOnly(Message)}.
+   *
+   * @param message The request message.
+   * @return the reply.
+   */
+  RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOException;
+
   /**
    * Send the given stale-read message to the given server (not the raft service).
    * If the server commit index is larger than or equal to the given min-index, the request will be processed.
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 4672f5ecf..9bdc9d50a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -51,6 +51,11 @@ class AsyncImpl implements AsyncRpcApi {
     return send(RaftClientRequest.readRequestType(), message, server);
   }
 
+  @Override
+  public CompletableFuture<RaftClientReply> sendReadOnlyNonLinearizable(Message message) {
+    return send(RaftClientRequest.readRequestType(true), message, null);
+  }
+
   @Override
   public CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message, RaftPeerId server) {
     return UnorderedAsync.send(RaftClientRequest.readRequestType(), message, server, client);
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index ea06cdca8..7e81baf8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -64,6 +64,11 @@ class BlockingImpl implements BlockingApi {
     return send(RaftClientRequest.readRequestType(), message, server);
   }
 
+  @Override
+  public RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOException {
+    return send(RaftClientRequest.readRequestType(true), message, null);
+  }
+
   @Override
   public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
       throws IOException {
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 8fef42d8a..ae76607ff 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -36,6 +36,8 @@ public class RaftClientRequest extends RaftClientMessage {
       WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
 
   private static final Type READ_DEFAULT = new Type(ReadRequestTypeProto.getDefaultInstance());
+  private static final Type
+      READ_NONLINEARIZABLE_DEFAULT = new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build());
   private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance());
 
   public static Type writeRequestType() {
@@ -62,6 +64,10 @@ public class RaftClientRequest extends RaftClientMessage {
     return READ_DEFAULT;
   }
 
+  public static Type readRequestType(boolean nonLinearizable) {
+    return nonLinearizable? READ_NONLINEARIZABLE_DEFAULT: readRequestType();
+  }
+
   public static Type staleReadRequestType(long minIndex) {
     return minIndex == 0L? STALE_READ_DEFAULT
         : new Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
@@ -89,7 +95,7 @@ public class RaftClientRequest extends RaftClientMessage {
     }
 
     public static Type valueOf(ReadRequestTypeProto read) {
-      return READ_DEFAULT;
+      return read.getPreferNonLinearizable()? READ_NONLINEARIZABLE_DEFAULT: READ_DEFAULT;
     }
 
     public static Type valueOf(StaleReadRequestTypeProto staleRead) {
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index d90e57932..b83c375c6 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -294,6 +294,7 @@ message ForwardRequestTypeProto {
 }
 
 message ReadRequestTypeProto {
+  bool preferNonLinearizable = 1;
 }
 
 message StaleReadRequestTypeProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d6224bf44..d04f3126d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -935,7 +935,8 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
-    if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+    if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE
+        && !request.getType().getRead().getPreferNonLinearizable()) {
       /*
         Linearizable read using ReadIndex. See Raft paper section 6.4.
         1. First obtain readIndex from Leader.
@@ -962,7 +963,8 @@ class RaftServerImpl implements RaftServer.Division,
           .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
           .thenCompose(readIndex -> queryStateMachine(request))
           .exceptionally(e -> readException2Reply(request, e));
-    } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
+    } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT
+        || request.getType().getRead().getPreferNonLinearizable()) {
        CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
        if (reply != null) {
          return reply;