You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/12/22 15:25:12 UTC
[ratis] branch master updated: RATIS-1475. Use the stream level SlidingWindow.client in OrderedStreamAsync (#568)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new beb3e3d RATIS-1475. Use the stream level SlidingWindow.client in OrderedStreamAsync (#568)
beb3e3d is described below
commit beb3e3ded54432ae23a80e35b5abb1e5d466b08e
Author: hao guo <gu...@360.cn>
AuthorDate: Wed Dec 22 23:25:08 2021 +0800
RATIS-1475. Use the stream level SlidingWindow.client in OrderedStreamAsync (#568)
---
.../apache/ratis/client/impl/DataStreamClientImpl.java | 7 +++++--
.../org/apache/ratis/client/impl/OrderedStreamAsync.java | 16 ++++++++--------
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 29356f6..7e04d2f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -41,6 +41,7 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.SlidingWindow;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -66,12 +67,13 @@ public class DataStreamClientImpl implements DataStreamClient {
this.groupId = groupId;
this.dataStreamServer = dataStreamServer;
this.dataStreamClientRpc = dataStreamClientRpc;
- this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
+ this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties);
}
public final class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
+ private final SlidingWindow.Client<OrderedStreamAsync.DataStreamWindowRequest, DataStreamReply> slidingWindow;
private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
private CompletableFuture<DataStreamReply> closeFuture;
private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier
@@ -103,6 +105,7 @@ public class DataStreamClientImpl implements DataStreamClient {
private DataStreamOutputImpl(RaftClientRequest request) {
this.header = request;
+ this.slidingWindow = new SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, header.getCallId()));
final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
}
@@ -110,7 +113,7 @@ public class DataStreamClientImpl implements DataStreamClient {
private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, WriteOption... options) {
final DataStreamRequestHeader h =
new DataStreamRequestHeader(header.getClientId(), type, header.getCallId(), streamOffset, length, options);
- return orderedStreamAsync.sendRequest(h, data);
+ return orderedStreamAsync.sendRequest(h, data, slidingWindow);
}
private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> future) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index ffe1b1e..d9f5862 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -24,7 +24,6 @@ import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.io.FilePositionCount;
-import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
@@ -103,19 +102,19 @@ public class OrderedStreamAsync {
}
private final DataStreamClientRpc dataStreamClientRpc;
- private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow;
+
private final Semaphore requestSemaphore;
private final TimeDuration requestTimeout;
private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
- OrderedStreamAsync(ClientId clientId, DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
+ OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
this.dataStreamClientRpc = dataStreamClientRpc;
- this.slidingWindow = new SlidingWindow.Client<>(clientId);
this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
this.requestTimeout = RaftClientConfigKeys.DataStream.requestTimeout(properties);
}
- CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data) {
+ CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data,
+ SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow) {
try {
requestSemaphore.acquire();
} catch (InterruptedException e){
@@ -124,7 +123,7 @@ public class OrderedStreamAsync {
}
final LongFunction<DataStreamWindowRequest> constructor
= seqNum -> new DataStreamWindowRequest(header, data, seqNum);
- return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
+ return slidingWindow.submitNewRequest(constructor, r -> sendRequestToNetwork(r, slidingWindow)).
getReplyFuture().whenComplete((r, e) -> {
if (e != null) {
LOG.error("Failed to send request, header=" + header, e);
@@ -133,7 +132,8 @@ public class OrderedStreamAsync {
});
}
- private void sendRequestToNetwork(DataStreamWindowRequest request){
+ private void sendRequestToNetwork(DataStreamWindowRequest request,
+ SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow) {
CompletableFuture<DataStreamReply> f = request.getReplyFuture();
if(f.isDone()) {
return;
@@ -149,7 +149,7 @@ public class OrderedStreamAsync {
requestFuture.thenApply(reply -> {
slidingWindow.receiveReply(
- seqNum, reply, this::sendRequestToNetwork);
+ seqNum, reply, r -> sendRequestToNetwork(r, slidingWindow));
return reply;
}).thenAccept(reply -> {
if (f.isDone()) {