You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/12/15 02:29:18 UTC

[ratis] branch master updated: RATIS-1438. Add request timeout to ratis Streaming (#563)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f349fdf  RATIS-1438. Add request timeout to ratis Streaming (#563)
f349fdf is described below

commit f349fdf2488e254d0fa2eeb17bbaf44de2b0932d
Author: hao guo <gu...@360.cn>
AuthorDate: Wed Dec 15 10:29:13 2021 +0800

    RATIS-1438. Add request timeout to ratis Streaming (#563)
---
 .../apache/ratis/client/RaftClientConfigKeys.java  | 10 ++++++++++
 .../ratis/client/impl/OrderedStreamAsync.java      | 23 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 5fb5dff..c8388bb 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -103,6 +103,16 @@ public interface RaftClientConfigKeys {
     static void setOutstandingRequestsMax(RaftProperties properties, int outstandingRequests) {
       setInt(properties::setInt, OUTSTANDING_REQUESTS_MAX_KEY, outstandingRequests);
     }
+
+    String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
+    TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+    static TimeDuration requestTimeout(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()),
+          REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT, getDefaultLog());
+    }
+    static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) {
+      setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration);
+    }
   }
 
   interface MessageStream {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 52606e1..ffe1b1e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -28,9 +28,12 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SlidingWindow;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,16 +95,24 @@ public class OrderedStreamAsync {
     public CompletableFuture<DataStreamReply> getReplyFuture(){
       return replyFuture;
     }
+
+    @Override
+    public String toString() {
+      return JavaUtils.getClassSimpleName(getClass()) + ":seqNum=" + seqNum + "," + header;
+    }
   }
 
   private final DataStreamClientRpc dataStreamClientRpc;
   private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow;
   private final Semaphore requestSemaphore;
+  private final TimeDuration requestTimeout;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
   OrderedStreamAsync(ClientId clientId, DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
     this.dataStreamClientRpc = dataStreamClientRpc;
     this.slidingWindow = new SlidingWindow.Client<>(clientId);
     this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
+    this.requestTimeout = RaftClientConfigKeys.DataStream.requestTimeout(properties);
   }
 
   CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data) {
@@ -133,6 +144,9 @@ public class OrderedStreamAsync {
     final CompletableFuture<DataStreamReply> requestFuture = dataStreamClientRpc.streamAsync(
         request.getDataStreamRequest());
     long seqNum = request.getSeqNum();
+
+    scheduleWithTimeout(request);
+
     requestFuture.thenApply(reply -> {
       slidingWindow.receiveReply(
           seqNum, reply, this::sendRequestToNetwork);
@@ -147,4 +161,13 @@ public class OrderedStreamAsync {
       return null;
     });
   }
+
+  private void scheduleWithTimeout(DataStreamWindowRequest request) {
+    scheduler.onTimeout(requestTimeout, () -> {
+      if (!request.getReplyFuture().isDone()) {
+        request.getReplyFuture().completeExceptionally(
+            new TimeoutIOException("Timeout " + requestTimeout + ": Failed to send " + request));
+      }
+    }, LOG, () -> "Failed to completeExceptionally for " + request);
+  }
 }