You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/12/20 03:57:04 UTC
[ratis] branch master updated: RATIS-1759. Support client use linearizable read per request (#798)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 684a19aa1 RATIS-1759. Support client use linearizable read per request (#798)
684a19aa1 is described below
commit 684a19aa1af4c467de2be068234170a6a2ba292f
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Tue Dec 20 11:56:47 2022 +0800
RATIS-1759. Support client use linearizable read per request (#798)
* Support client use linearizable read per request
* add new interface: sendReadOnlyNonLinearizable
---
.../src/main/java/org/apache/ratis/client/api/AsyncApi.java | 11 +++++++++++
.../main/java/org/apache/ratis/client/api/BlockingApi.java | 11 +++++++++++
.../src/main/java/org/apache/ratis/client/impl/AsyncImpl.java | 5 +++++
.../main/java/org/apache/ratis/client/impl/BlockingImpl.java | 5 +++++
.../java/org/apache/ratis/protocol/RaftClientRequest.java | 8 +++++++-
ratis-proto/src/main/proto/Raft.proto | 1 +
.../java/org/apache/ratis/server/impl/RaftServerImpl.java | 6 ++++--
7 files changed, 44 insertions(+), 3 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 84a4f5437..c6f5e4181 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -55,6 +55,17 @@ public interface AsyncApi {
*/
CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeerId server);
+ /**
+ * Send the given readonly message asynchronously to the raft service using non-linearizable read.
+ * This method is useful when linearizable read is enabled
+ * but this client prefers not using it for performance reason.
+ * When linearizable read is disabled, this method is the same as {@link #sendReadOnly(Message)}.
+ *
+ * @param message The request message.
+ * @return a future of the reply.
+ */
+ CompletableFuture<RaftClientReply> sendReadOnlyNonLinearizable(Message message);
+
/** The same as sendReadOnlyUnordered(message, null). */
default CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message) {
return sendReadOnlyUnordered(message, null);
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
index 4a5237afc..dc03e1b8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java
@@ -54,6 +54,17 @@ public interface BlockingApi {
*/
RaftClientReply sendReadOnly(Message message, RaftPeerId server) throws IOException;
+ /**
+ * Send the given readonly message to the raft service using non-linearizable read.
+ * This method is useful when linearizable read is enabled
+ * but this client prefers not using it for performance reason.
+ * When linearizable read is disabled, this method is the same as {@link #sendReadOnly(Message)}.
+ *
+ * @param message The request message.
+ * @return the reply.
+ */
+ RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOException;
+
/**
* Send the given stale-read message to the given server (not the raft service).
* If the server commit index is larger than or equal to the given min-index, the request will be processed.
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 4672f5ecf..9bdc9d50a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -51,6 +51,11 @@ class AsyncImpl implements AsyncRpcApi {
return send(RaftClientRequest.readRequestType(), message, server);
}
+ @Override
+ public CompletableFuture<RaftClientReply> sendReadOnlyNonLinearizable(Message message) {
+ return send(RaftClientRequest.readRequestType(true), message, null);
+ }
+
@Override
public CompletableFuture<RaftClientReply> sendReadOnlyUnordered(Message message, RaftPeerId server) {
return UnorderedAsync.send(RaftClientRequest.readRequestType(), message, server, client);
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index ea06cdca8..7e81baf8d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -64,6 +64,11 @@ class BlockingImpl implements BlockingApi {
return send(RaftClientRequest.readRequestType(), message, server);
}
+ @Override
+ public RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOException {
+ return send(RaftClientRequest.readRequestType(true), message, null);
+ }
+
@Override
public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
throws IOException {
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 8fef42d8a..ae76607ff 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -36,6 +36,8 @@ public class RaftClientRequest extends RaftClientMessage {
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
private static final Type READ_DEFAULT = new Type(ReadRequestTypeProto.getDefaultInstance());
+ private static final Type
+ READ_NONLINEARIZABLE_DEFAULT = new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build());
private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance());
public static Type writeRequestType() {
@@ -62,6 +64,10 @@ public class RaftClientRequest extends RaftClientMessage {
return READ_DEFAULT;
}
+ public static Type readRequestType(boolean nonLinearizable) {
+ return nonLinearizable? READ_NONLINEARIZABLE_DEFAULT: readRequestType();
+ }
+
public static Type staleReadRequestType(long minIndex) {
return minIndex == 0L? STALE_READ_DEFAULT
: new Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
@@ -89,7 +95,7 @@ public class RaftClientRequest extends RaftClientMessage {
}
public static Type valueOf(ReadRequestTypeProto read) {
- return READ_DEFAULT;
+ return read.getPreferNonLinearizable()? READ_NONLINEARIZABLE_DEFAULT: READ_DEFAULT;
}
public static Type valueOf(StaleReadRequestTypeProto staleRead) {
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index d90e57932..b83c375c6 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -294,6 +294,7 @@ message ForwardRequestTypeProto {
}
message ReadRequestTypeProto {
+ bool preferNonLinearizable = 1;
}
message StaleReadRequestTypeProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d6224bf44..d04f3126d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -935,7 +935,8 @@ class RaftServerImpl implements RaftServer.Division,
}
private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
- if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+ if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE
+ && !request.getType().getRead().getPreferNonLinearizable()) {
/*
Linearizable read using ReadIndex. See Raft paper section 6.4.
1. First obtain readIndex from Leader.
@@ -962,7 +963,8 @@ class RaftServerImpl implements RaftServer.Division,
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
- } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
+ } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT
+ || request.getType().getRead().getPreferNonLinearizable()) {
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
if (reply != null) {
return reply;