You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/12/22 15:25:12 UTC

[ratis] branch master updated: RATIS-1475. Use the stream level SlidingWindow.client in OrderedStreamAsync (#568)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new beb3e3d  RATIS-1475. Use the stream level SlidingWindow.client in OrderedStreamAsync (#568)
beb3e3d is described below

commit beb3e3ded54432ae23a80e35b5abb1e5d466b08e
Author: hao guo <gu...@360.cn>
AuthorDate: Wed Dec 22 23:25:08 2021 +0800

    RATIS-1475. Use the stream level SlidingWindow.client in OrderedStreamAsync (#568)
---
 .../apache/ratis/client/impl/DataStreamClientImpl.java   |  7 +++++--
 .../org/apache/ratis/client/impl/OrderedStreamAsync.java | 16 ++++++++--------
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 29356f6..7e04d2f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -41,6 +41,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.SlidingWindow;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -66,12 +67,13 @@ public class DataStreamClientImpl implements DataStreamClient {
     this.groupId = groupId;
     this.dataStreamServer = dataStreamServer;
     this.dataStreamClientRpc = dataStreamClientRpc;
-    this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
+    this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties);
   }
 
   public final class DataStreamOutputImpl implements DataStreamOutputRpc {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
+    private final SlidingWindow.Client<OrderedStreamAsync.DataStreamWindowRequest, DataStreamReply> slidingWindow;
     private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
     private CompletableFuture<DataStreamReply> closeFuture;
     private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier
@@ -103,6 +105,7 @@ public class DataStreamClientImpl implements DataStreamClient {
 
     private DataStreamOutputImpl(RaftClientRequest request) {
       this.header = request;
+      this.slidingWindow = new SlidingWindow.Client<>(ClientInvocationId.valueOf(clientId, header.getCallId()));
       final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
       this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
     }
@@ -110,7 +113,7 @@ public class DataStreamClientImpl implements DataStreamClient {
     private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, WriteOption... options) {
       final DataStreamRequestHeader h =
           new DataStreamRequestHeader(header.getClientId(), type, header.getCallId(), streamOffset, length, options);
-      return orderedStreamAsync.sendRequest(h, data);
+      return orderedStreamAsync.sendRequest(h, data, slidingWindow);
     }
 
     private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> future) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index ffe1b1e..d9f5862 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -24,7 +24,6 @@ import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.FilePositionCount;
-import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
@@ -103,19 +102,19 @@ public class OrderedStreamAsync {
   }
 
   private final DataStreamClientRpc dataStreamClientRpc;
-  private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow;
+
   private final Semaphore requestSemaphore;
   private final TimeDuration requestTimeout;
   private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
-  OrderedStreamAsync(ClientId clientId, DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
+  OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
     this.dataStreamClientRpc = dataStreamClientRpc;
-    this.slidingWindow = new SlidingWindow.Client<>(clientId);
     this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
     this.requestTimeout = RaftClientConfigKeys.DataStream.requestTimeout(properties);
   }
 
-  CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data) {
+  CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data,
+      SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow) {
     try {
       requestSemaphore.acquire();
     } catch (InterruptedException e){
@@ -124,7 +123,7 @@ public class OrderedStreamAsync {
     }
     final LongFunction<DataStreamWindowRequest> constructor
         = seqNum -> new DataStreamWindowRequest(header, data, seqNum);
-    return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
+    return slidingWindow.submitNewRequest(constructor, r -> sendRequestToNetwork(r, slidingWindow)).
            getReplyFuture().whenComplete((r, e) -> {
              if (e != null) {
                LOG.error("Failed to send request, header=" + header, e);
@@ -133,7 +132,8 @@ public class OrderedStreamAsync {
            });
   }
 
-  private void sendRequestToNetwork(DataStreamWindowRequest request){
+  private void sendRequestToNetwork(DataStreamWindowRequest request,
+      SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow) {
     CompletableFuture<DataStreamReply> f = request.getReplyFuture();
     if(f.isDone()) {
       return;
@@ -149,7 +149,7 @@ public class OrderedStreamAsync {
 
     requestFuture.thenApply(reply -> {
       slidingWindow.receiveReply(
-          seqNum, reply, this::sendRequestToNetwork);
+          seqNum, reply, r -> sendRequestToNetwork(r, slidingWindow));
       return reply;
     }).thenAccept(reply -> {
       if (f.isDone()) {