You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/12/15 02:29:18 UTC
[ratis] branch master updated: RATIS-1438. Add request timeout to ratis Streaming (#563)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f349fdf RATIS-1438. Add request timeout to ratis Streaming (#563)
f349fdf is described below
commit f349fdf2488e254d0fa2eeb17bbaf44de2b0932d
Author: hao guo <gu...@360.cn>
AuthorDate: Wed Dec 15 10:29:13 2021 +0800
RATIS-1438. Add request timeout to ratis Streaming (#563)
---
.../apache/ratis/client/RaftClientConfigKeys.java | 10 ++++++++++
.../ratis/client/impl/OrderedStreamAsync.java | 23 ++++++++++++++++++++++
2 files changed, 33 insertions(+)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 5fb5dff..c8388bb 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -103,6 +103,16 @@ public interface RaftClientConfigKeys {
static void setOutstandingRequestsMax(RaftProperties properties, int outstandingRequests) {
setInt(properties::setInt, OUTSTANDING_REQUESTS_MAX_KEY, outstandingRequests);
}
+
+ String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
+ TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+ static TimeDuration requestTimeout(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(REQUEST_TIMEOUT_DEFAULT.getUnit()),
+ REQUEST_TIMEOUT_KEY, REQUEST_TIMEOUT_DEFAULT, getDefaultLog());
+ }
+ static void setRequestTimeout(RaftProperties properties, TimeDuration timeoutDuration) {
+ setTimeDuration(properties::setTimeDuration, REQUEST_TIMEOUT_KEY, timeoutDuration);
+ }
}
interface MessageStream {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 52606e1..ffe1b1e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -28,9 +28,12 @@ import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.SlidingWindow;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,16 +95,24 @@ public class OrderedStreamAsync {
public CompletableFuture<DataStreamReply> getReplyFuture(){
return replyFuture;
}
+
+ @Override
+ public String toString() {
+ return JavaUtils.getClassSimpleName(getClass()) + ":seqNum=" + seqNum + "," + header;
+ }
}
private final DataStreamClientRpc dataStreamClientRpc;
private final SlidingWindow.Client<DataStreamWindowRequest, DataStreamReply> slidingWindow;
private final Semaphore requestSemaphore;
+ private final TimeDuration requestTimeout;
+ private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
OrderedStreamAsync(ClientId clientId, DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){
this.dataStreamClientRpc = dataStreamClientRpc;
this.slidingWindow = new SlidingWindow.Client<>(clientId);
this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
+ this.requestTimeout = RaftClientConfigKeys.DataStream.requestTimeout(properties);
}
CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data) {
@@ -133,6 +144,9 @@ public class OrderedStreamAsync {
final CompletableFuture<DataStreamReply> requestFuture = dataStreamClientRpc.streamAsync(
request.getDataStreamRequest());
long seqNum = request.getSeqNum();
+
+ scheduleWithTimeout(request);
+
requestFuture.thenApply(reply -> {
slidingWindow.receiveReply(
seqNum, reply, this::sendRequestToNetwork);
@@ -147,4 +161,13 @@ public class OrderedStreamAsync {
return null;
});
}
+
+ private void scheduleWithTimeout(DataStreamWindowRequest request) {
+ scheduler.onTimeout(requestTimeout, () -> {
+ if (!request.getReplyFuture().isDone()) {
+ request.getReplyFuture().completeExceptionally(
+ new TimeoutIOException("Timeout " + requestTimeout + ": Failed to send " + request));
+ }
+ }, LOG, () -> "Failed to completeExceptionally for " + request);
+ }
}