You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/07 04:33:00 UTC
[ratis] branch master updated: RATIS-1807. Support timeout in gRPC. (#842)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 69263c884 RATIS-1807. Support timeout in gRPC. (#842)
69263c884 is described below
commit 69263c884573b87eada10bdd269c9e9d31fb2e94
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Mar 6 20:32:54 2023 -0800
RATIS-1807. Support timeout in gRPC. (#842)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 7 +-
.../grpc/server/GrpcServerProtocolClient.java | 7 +-
.../grpc/util/ResponseNotifyClientInterceptor.java | 72 ++++++++++++
.../ratis/grpc/util/StreamObserverWithTimeout.java | 95 ++++++++++++++++
ratis-proto/src/main/proto/Test.proto | 37 +++++++
.../org/apache/ratis/grpc/util/GrpcTestClient.java | 123 +++++++++++++++++++++
.../org/apache/ratis/grpc/util/GrpcTestServer.java | 108 ++++++++++++++++++
.../grpc/util/TestStreamObserverWithTimeout.java | 122 ++++++++++++++++++++
8 files changed, 566 insertions(+), 5 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index a61726177..1b954d6a7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -594,7 +594,8 @@ public class GrpcLogAppender extends LogAppenderBase {
StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
final String requestId = UUID.randomUUID().toString();
try {
- snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+ snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
+ requestTimeoutDuration, responseHandler);
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
if (isRunning()) {
snapshotRequestObserver.onNext(request);
@@ -644,7 +645,9 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
}
try {
- snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+ snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot",
+ requestTimeoutDuration, responseHandler);
+
snapshotRequestObserver.onNext(request);
getFollower().updateLastRpcSendTime(false);
responseHandler.addPending(request);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 4c28c1df4..c3f8730e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc.server;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -136,9 +137,9 @@ public class GrpcServerProtocolClient implements Closeable {
}
StreamObserver<InstallSnapshotRequestProto> installSnapshot(
- StreamObserver<InstallSnapshotReplyProto> responseHandler) {
- return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .installSnapshot(responseHandler);
+ String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+ return StreamObserverWithTimeout.newInstance(name, timeout,
+ i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
}
// short-circuit the backoff timer and make them reconnect immediately.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java
new file mode 100644
index 000000000..77577b06d
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.thirdparty.io.grpc.CallOptions;
+import org.apache.ratis.thirdparty.io.grpc.Channel;
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Consumer;
+
+/**
+ * Invoke the given notifier when receiving a response.
+ */
+public class ResponseNotifyClientInterceptor implements ClientInterceptor {
+ public static final Logger LOG = LoggerFactory.getLogger(ResponseNotifyClientInterceptor.class);
+
+ private final Consumer<Object> notifier;
+
+ public ResponseNotifyClientInterceptor(Consumer<Object> notifier) {
+ this.notifier = notifier;
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+ LOG.debug("callOptions {}", callOptions);
+ return new Call<>(next.newCall(method, callOptions));
+ }
+
+ private final class Call<ReqT, RespT>
+ extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
+
+ private Call(ClientCall<ReqT, RespT> delegate) {
+ super(delegate);
+ }
+
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ LOG.debug("start {}", headers);
+ super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
+ @Override
+ public void onMessage(RespT message) {
+ LOG.debug("onMessage {}", message);
+ notifier.accept(message);
+ super.onMessage(message);
+ }
+ }, headers);
+ }
+ }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
new file mode 100644
index 000000000..2b875f3ed
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+
+public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
+ public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
+
+ public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout,
+ Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
+ final AtomicInteger responseCount = new AtomicInteger();
+ final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(
+ r -> responseCount.getAndIncrement());
+ return new StreamObserverWithTimeout<>(
+ name, timeout, responseCount::get, newStreamObserver.apply(interceptor));
+ }
+
+ private final String name;
+ private final TimeDuration timeout;
+ private final StreamObserver<T> observer;
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
+ private final AtomicBoolean isClose = new AtomicBoolean();
+ private final AtomicInteger requestCount = new AtomicInteger();
+ private final IntSupplier responseCount;
+
+ private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
+ StreamObserver<T> observer) {
+ this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+ this.timeout = timeout;
+ this.responseCount = responseCount;
+ this.observer = observer;
+ }
+
+ @Override
+ public void onNext(T request) {
+ observer.onNext(request);
+ final int id = requestCount.incrementAndGet();
+ scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
+ LOG, () -> name + ": Timeout check failed for request: " + request);
+ }
+
+ private void handleTimeout(int id, T request) {
+ if (id > responseCount.getAsInt()) {
+ onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request));
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (isClose.compareAndSet(false, true)) {
+ observer.onError(throwable);
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ if (isClose.compareAndSet(false, true)) {
+ observer.onCompleted();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/ratis-proto/src/main/proto/Test.proto b/ratis-proto/src/main/proto/Test.proto
new file mode 100644
index 000000000..8d5769ff3
--- /dev/null
+++ b/ratis-proto/src/main/proto/Test.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.ratis.test.proto";
+option java_outer_classname = "TestProto";
+
+package org.apache.ratis.test;
+
+service Greeter {
+ rpc Hello (stream HelloRequest)
+ returns (stream HelloReply) {}
+}
+
+message HelloRequest {
+ string name = 1;
+}
+
+message HelloReply {
+ string message = 1;
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
new file mode 100644
index 000000000..0923b27fe
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.test.proto.GreeterGrpc;
+import org.apache.ratis.test.proto.GreeterGrpc.GreeterStub;
+import org.apache.ratis.test.proto.HelloReply;
+import org.apache.ratis.test.proto.HelloRequest;
+import org.apache.ratis.thirdparty.io.grpc.Deadline;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+/** gRPC client for testing */
+class GrpcTestClient implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(GrpcTestClient.class);
+
+ @FunctionalInterface
+ interface StreamObserverFactory
+ extends BiFunction<GreeterStub, StreamObserver<HelloReply>, StreamObserver<HelloRequest>> {
+ }
+
+ static StreamObserverFactory withDeadline(TimeDuration timeout) {
+ final Deadline d = Deadline.after(timeout.getDuration(), timeout.getUnit());
+ return (stub, responseHandler) -> stub.withDeadline(d).hello(responseHandler);
+ }
+
+ static StreamObserverFactory withTimeout(TimeDuration timeout) {
+ return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout,
+ i -> stub.withInterceptors(i).hello(responseHandler));
+ }
+
+ private final ManagedChannel channel;
+ private final StreamObserver<HelloRequest> requestHandler;
+ private final Queue<CompletableFuture<String>> replies = new ConcurrentLinkedQueue<>();
+
+ GrpcTestClient(String host, int port, StreamObserverFactory factory) {
+ this.channel = ManagedChannelBuilder.forAddress(host, port)
+ .usePlaintext()
+ .build();
+
+ final GreeterStub asyncStub = GreeterGrpc.newStub(channel);
+ final StreamObserver<HelloReply> responseHandler = new StreamObserver<HelloReply>() {
+ @Override
+ public void onNext(HelloReply helloReply) {
+ replies.poll().complete(helloReply.getMessage());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.info("onError", throwable);
+ completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ LOG.info("onCompleted");
+ completeExceptionally(new IllegalStateException("onCompleted"));
+ }
+
+ void completeExceptionally(Throwable throwable) {
+ replies.forEach(f -> f.completeExceptionally(throwable));
+ replies.clear();
+ }
+ };
+
+ this.requestHandler = factory.apply(asyncStub, responseHandler);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ /* After the request handler is cancelled, no more life-cycle hooks are allowed,
+ * see {@link org.apache.ratis.thirdparty.io.grpc.ClientCall.Listener#cancel(String, Throwable)} */
+ // requestHandler.onCompleted();
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw IOUtils.toInterruptedIOException("Failed to close", e);
+ }
+ }
+
+ CompletableFuture<String> send(String name) {
+ LOG.info("send {}", name);
+ final HelloRequest request = HelloRequest.newBuilder().setName(name).build();
+ final CompletableFuture<String> f = new CompletableFuture<>();
+ try {
+ requestHandler.onNext(request);
+ replies.offer(f);
+ } catch (IllegalStateException e) {
+ // already closed
+ f.completeExceptionally(e);
+ }
+ return f;
+ }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java
new file mode 100644
index 000000000..ec9d63b13
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.test.proto.GreeterGrpc;
+import org.apache.ratis.test.proto.HelloReply;
+import org.apache.ratis.test.proto.HelloRequest;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/** gRPC server for testing */
+class GrpcTestServer implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(GrpcTestServer.class);
+
+ private final Server server;
+
+ GrpcTestServer(int port, int slow, TimeDuration timeout) {
+ this.server = ServerBuilder.forPort(port)
+ .addService(new GreeterImpl(slow, timeout))
+ .build();
+ }
+
+ int start() throws IOException {
+ server.start();
+ return server.getPort();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw IOUtils.toInterruptedIOException("Failed to close", e);
+ }
+ }
+
+ static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
+ static String toReplySuffix(String request) {
+ return ") Hello " + request;
+ }
+
+ private final int slow;
+ private final TimeDuration shortSleepTime;
+ private final TimeDuration longSleepTime;
+ private int count = 0;
+
+ GreeterImpl(int slow, TimeDuration timeout) {
+ this.slow = slow;
+ this.shortSleepTime = timeout.multiply(0.25);
+ this.longSleepTime = timeout.multiply(2);
+ }
+
+ @Override
+ public StreamObserver<HelloRequest> hello(StreamObserver<HelloReply> responseObserver) {
+ return new StreamObserver<HelloRequest>() {
+ @Override
+ public void onNext(HelloRequest helloRequest) {
+ final String reply = count + toReplySuffix(helloRequest.getName());
+ final TimeDuration sleepTime = count < slow ? shortSleepTime : longSleepTime;
+ LOG.info("count = {}, slow = {}, sleep {}", reply, slow, sleepTime);
+ try {
+ sleepTime.sleep();
+ } catch (InterruptedException e) {
+ responseObserver.onError(e);
+ return;
+ }
+ responseObserver.onNext(HelloReply.newBuilder().setMessage(reply).build());
+ count++;
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOG.error("onError", throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+ };
+ }
+ }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
new file mode 100644
index 000000000..dac58812d
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.grpc.util.GrpcTestClient.StreamObserverFactory;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.Slf4jUtils;
+import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+public class TestStreamObserverWithTimeout extends BaseTest {
+ {
+ Slf4jUtils.setLogLevel(ResponseNotifyClientInterceptor.LOG, Level.TRACE);
+ }
+
+ enum Type {
+ WithDeadline(GrpcTestClient::withDeadline),
+ WithTimeout(GrpcTestClient::withTimeout);
+
+ private final Function<TimeDuration, StreamObserverFactory> factory;
+
+ Type(Function<TimeDuration, StreamObserverFactory> function) {
+ this.factory = function;
+ }
+
+ StreamObserverFactory createFunction(TimeDuration timeout) {
+ return factory.apply(timeout);
+ }
+ }
+
+ @Test
+ public void testWithDeadline() throws Exception {
+ //the total sleep time is within the deadline
+ runTestTimeout(2, Type.WithDeadline);
+ }
+
+ @Test
+ public void testWithDeadlineFailure() {
+ //Expected to have DEADLINE_EXCEEDED
+ testFailureCase("total sleep time is longer than the deadline",
+ () -> runTestTimeout(5, Type.WithDeadline),
+ ExecutionException.class, StatusRuntimeException.class);
+ }
+
+ @Test
+ public void testWithTimeout() throws Exception {
+ //Each sleep time is within the timeout,
+ //Note that the total sleep time is longer than the timeout, but it does not matter.
+ runTestTimeout(5, Type.WithTimeout);
+ }
+
+ void runTestTimeout(int slow, Type type) throws Exception {
+ LOG.info("slow = {}, {}", slow, type);
+ final TimeDuration timeout = ONE_SECOND.multiply(0.5);
+ final StreamObserverFactory function = type.createFunction(timeout);
+ final InetSocketAddress address = NetUtils.createLocalServerAddress();
+
+ final List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 2 * slow; i++) {
+ messages.add("m" + i);
+ }
+ try (GrpcTestServer server = new GrpcTestServer(address.getPort(), slow, timeout)) {
+ final int port = server.start();
+ try (GrpcTestClient client = new GrpcTestClient(address.getHostName(), port, function)) {
+
+ final List<CompletableFuture<String>> futures = new ArrayList<>();
+ for (String m : messages) {
+ if (type == Type.WithTimeout) {
+ timeout.sleep();
+ }
+ futures.add(client.send(m));
+ }
+
+ int i = 0;
+ for (; i < slow; i++) {
+ final String expected = i + GrpcTestServer.GreeterImpl.toReplySuffix(messages.get(i));
+ final String reply = futures.get(i).get();
+ Assert.assertEquals("expected = " + expected + " != reply = " + reply, expected, reply);
+ LOG.info("{}) passed", i);
+ }
+
+ for (; i < messages.size(); i++) {
+ final CompletableFuture<String> f = futures.get(i);
+ try {
+ final String reply = f.get();
+ Assert.fail(i + ") reply = " + reply + ", "
+ + StringUtils.completableFuture2String(f, false));
+ } catch (ExecutionException e) {
+ LOG.info("GOOD! {}) {}, {}", i, StringUtils.completableFuture2String(f, true), e);
+ }
+ }
+ }
+ }
+ }
+}