You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/07 04:33:00 UTC

[ratis] branch master updated: RATIS-1807. Support timeout in gRPC. (#842)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 69263c884 RATIS-1807. Support timeout in gRPC. (#842)
69263c884 is described below

commit 69263c884573b87eada10bdd269c9e9d31fb2e94
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Mar 6 20:32:54 2023 -0800

    RATIS-1807. Support timeout in gRPC. (#842)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   7 +-
 .../grpc/server/GrpcServerProtocolClient.java      |   7 +-
 .../grpc/util/ResponseNotifyClientInterceptor.java |  72 ++++++++++++
 .../ratis/grpc/util/StreamObserverWithTimeout.java |  95 ++++++++++++++++
 ratis-proto/src/main/proto/Test.proto              |  37 +++++++
 .../org/apache/ratis/grpc/util/GrpcTestClient.java | 123 +++++++++++++++++++++
 .../org/apache/ratis/grpc/util/GrpcTestServer.java | 108 ++++++++++++++++++
 .../grpc/util/TestStreamObserverWithTimeout.java   | 122 ++++++++++++++++++++
 8 files changed, 566 insertions(+), 5 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index a61726177..1b954d6a7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -594,7 +594,8 @@ public class GrpcLogAppender extends LogAppenderBase {
     StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
     final String requestId = UUID.randomUUID().toString();
     try {
-      snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+      snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot",
+          requestTimeoutDuration, responseHandler);
       for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
         if (isRunning()) {
           snapshotRequestObserver.onNext(request);
@@ -644,7 +645,9 @@ public class GrpcLogAppender extends LogAppenderBase {
       LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request));
     }
     try {
-      snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+      snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot",
+          requestTimeoutDuration, responseHandler);
+
       snapshotRequestObserver.onNext(request);
       getFollower().updateLastRpcSendTime(false);
       responseHandler.addPending(request);
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 4c28c1df4..c3f8730e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.grpc.util.StreamObserverWithTimeout;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -136,9 +137,9 @@ public class GrpcServerProtocolClient implements Closeable {
   }
 
   StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .installSnapshot(responseHandler);
+      String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+    return StreamObserverWithTimeout.newInstance(name, timeout,
+        i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler));
   }
 
   // short-circuit the backoff timer and make them reconnect immediately.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java
new file mode 100644
index 000000000..77577b06d
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/ResponseNotifyClientInterceptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.thirdparty.io.grpc.CallOptions;
+import org.apache.ratis.thirdparty.io.grpc.Channel;
+import org.apache.ratis.thirdparty.io.grpc.ClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall;
+import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener;
+import org.apache.ratis.thirdparty.io.grpc.Metadata;
+import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.Consumer;
+
+/**
+ * Invoke the given notifier when receiving a response.
+ */
+public class ResponseNotifyClientInterceptor implements ClientInterceptor {
+  public static final Logger LOG = LoggerFactory.getLogger(ResponseNotifyClientInterceptor.class);
+
+  private final Consumer<Object> notifier;
+
+  public ResponseNotifyClientInterceptor(Consumer<Object> notifier) {
+    this.notifier = notifier;
+  }
+
+  @Override
+  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+      MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
+    LOG.debug("callOptions {}", callOptions);
+    return new Call<>(next.newCall(method, callOptions));
+  }
+
+  private final class Call<ReqT, RespT>
+      extends ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT> {
+
+    private Call(ClientCall<ReqT, RespT> delegate) {
+      super(delegate);
+    }
+
+    @Override
+    public void start(Listener<RespT> responseListener, Metadata headers) {
+      LOG.debug("start {}", headers);
+      super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
+        @Override
+        public void onMessage(RespT message) {
+          LOG.debug("onMessage {}", message);
+          notifier.accept(message);
+          super.onMessage(message);
+        }
+      }, headers);
+    }
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
new file mode 100644
index 000000000..2b875f3ed
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+
+public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
+  public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
+
+  public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout,
+      Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) {
+    final AtomicInteger responseCount = new AtomicInteger();
+    final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(
+        r -> responseCount.getAndIncrement());
+    return new StreamObserverWithTimeout<>(
+        name, timeout, responseCount::get, newStreamObserver.apply(interceptor));
+  }
+
+  private final String name;
+  private final TimeDuration timeout;
+  private final StreamObserver<T> observer;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
+  private final AtomicBoolean isClose = new AtomicBoolean();
+  private final AtomicInteger requestCount = new AtomicInteger();
+  private final IntSupplier responseCount;
+
+  private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount,
+      StreamObserver<T> observer) {
+    this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name;
+    this.timeout = timeout;
+    this.responseCount = responseCount;
+    this.observer = observer;
+  }
+
+  @Override
+  public void onNext(T request) {
+    observer.onNext(request);
+    final int id = requestCount.incrementAndGet();
+    scheduler.onTimeout(timeout, () -> handleTimeout(id, request),
+        LOG, () -> name + ": Timeout check failed for request: " + request);
+  }
+
+  private void handleTimeout(int id, T request) {
+    if (id > responseCount.getAsInt()) {
+      onError(new TimeoutIOException(name + ": Timed out " + timeout + " for sending request " + request));
+    }
+  }
+
+  @Override
+  public void onError(Throwable throwable) {
+    if (isClose.compareAndSet(false, true)) {
+      observer.onError(throwable);
+    }
+  }
+
+  @Override
+  public void onCompleted() {
+    if (isClose.compareAndSet(false, true)) {
+      observer.onCompleted();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+}
diff --git a/ratis-proto/src/main/proto/Test.proto b/ratis-proto/src/main/proto/Test.proto
new file mode 100644
index 000000000..8d5769ff3
--- /dev/null
+++ b/ratis-proto/src/main/proto/Test.proto
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.ratis.test.proto";
+option java_outer_classname = "TestProto";
+
+package org.apache.ratis.test;
+
+service Greeter {
+    rpc Hello (stream HelloRequest)
+        returns (stream HelloReply) {}
+}
+
+message HelloRequest {
+    string name = 1;
+}
+
+message HelloReply {
+    string message = 1;
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
new file mode 100644
index 000000000..0923b27fe
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.test.proto.GreeterGrpc;
+import org.apache.ratis.test.proto.GreeterGrpc.GreeterStub;
+import org.apache.ratis.test.proto.HelloReply;
+import org.apache.ratis.test.proto.HelloRequest;
+import org.apache.ratis.thirdparty.io.grpc.Deadline;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+/** gRPC client for testing */
+class GrpcTestClient implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(GrpcTestClient.class);
+
+  @FunctionalInterface
+  interface StreamObserverFactory
+      extends BiFunction<GreeterStub, StreamObserver<HelloReply>, StreamObserver<HelloRequest>> {
+  }
+
+  static StreamObserverFactory withDeadline(TimeDuration timeout) {
+    final Deadline d = Deadline.after(timeout.getDuration(), timeout.getUnit());
+    return (stub, responseHandler) -> stub.withDeadline(d).hello(responseHandler);
+  }
+
+  static StreamObserverFactory withTimeout(TimeDuration timeout) {
+    return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout,
+        i -> stub.withInterceptors(i).hello(responseHandler));
+  }
+
+  private final ManagedChannel channel;
+  private final StreamObserver<HelloRequest> requestHandler;
+  private final Queue<CompletableFuture<String>> replies = new ConcurrentLinkedQueue<>();
+
+  GrpcTestClient(String host, int port, StreamObserverFactory factory) {
+    this.channel = ManagedChannelBuilder.forAddress(host, port)
+        .usePlaintext()
+        .build();
+
+    final GreeterStub asyncStub = GreeterGrpc.newStub(channel);
+    final StreamObserver<HelloReply> responseHandler = new StreamObserver<HelloReply>() {
+      @Override
+      public void onNext(HelloReply helloReply) {
+        replies.poll().complete(helloReply.getMessage());
+      }
+
+      @Override
+      public void onError(Throwable throwable) {
+        LOG.info("onError", throwable);
+        completeExceptionally(throwable);
+      }
+
+      @Override
+      public void onCompleted() {
+        LOG.info("onCompleted");
+        completeExceptionally(new IllegalStateException("onCompleted"));
+      }
+
+      void completeExceptionally(Throwable throwable) {
+        replies.forEach(f -> f.completeExceptionally(throwable));
+        replies.clear();
+      }
+    };
+
+    this.requestHandler = factory.apply(asyncStub, responseHandler);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      /* After the request handler is cancelled, no more life-cycle hooks are allowed,
+       * see {@link org.apache.ratis.thirdparty.io.grpc.ClientCall.Listener#cancel(String, Throwable)} */
+      // requestHandler.onCompleted();
+      channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw IOUtils.toInterruptedIOException("Failed to close", e);
+    }
+  }
+
+  CompletableFuture<String> send(String name) {
+    LOG.info("send {}", name);
+    final HelloRequest request = HelloRequest.newBuilder().setName(name).build();
+    final CompletableFuture<String> f = new CompletableFuture<>();
+    try {
+      requestHandler.onNext(request);
+      replies.offer(f);
+    } catch (IllegalStateException e) {
+      // already closed
+      f.completeExceptionally(e);
+    }
+    return f;
+  }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java
new file mode 100644
index 000000000..ec9d63b13
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestServer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.test.proto.GreeterGrpc;
+import org.apache.ratis.test.proto.HelloReply;
+import org.apache.ratis.test.proto.HelloRequest;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/** gRPC server for testing */
+class GrpcTestServer implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(GrpcTestServer.class);
+
+  private final Server server;
+
+  GrpcTestServer(int port, int slow, TimeDuration timeout) {
+    this.server = ServerBuilder.forPort(port)
+        .addService(new GreeterImpl(slow, timeout))
+        .build();
+  }
+
+  int start() throws IOException {
+    server.start();
+    return server.getPort();
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw IOUtils.toInterruptedIOException("Failed to close", e);
+    }
+  }
+
+  static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
+    static String toReplySuffix(String request) {
+      return ") Hello " + request;
+    }
+
+    private final int slow;
+    private final TimeDuration shortSleepTime;
+    private final TimeDuration longSleepTime;
+    private int count = 0;
+
+    GreeterImpl(int slow, TimeDuration timeout) {
+      this.slow = slow;
+      this.shortSleepTime = timeout.multiply(0.25);
+      this.longSleepTime = timeout.multiply(2);
+    }
+
+    @Override
+    public StreamObserver<HelloRequest> hello(StreamObserver<HelloReply> responseObserver) {
+      return new StreamObserver<HelloRequest>() {
+        @Override
+        public void onNext(HelloRequest helloRequest) {
+          final String reply = count + toReplySuffix(helloRequest.getName());
+          final TimeDuration sleepTime = count < slow ? shortSleepTime : longSleepTime;
+          LOG.info("count = {}, slow = {}, sleep {}", reply, slow, sleepTime);
+          try {
+            sleepTime.sleep();
+          } catch (InterruptedException e) {
+            responseObserver.onError(e);
+            return;
+          }
+          responseObserver.onNext(HelloReply.newBuilder().setMessage(reply).build());
+          count++;
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+          LOG.error("onError", throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+          responseObserver.onCompleted();
+        }
+      };
+    }
+  }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
new file mode 100644
index 000000000..dac58812d
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.util;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.grpc.util.GrpcTestClient.StreamObserverFactory;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.Slf4jUtils;
+import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+
+public class TestStreamObserverWithTimeout extends BaseTest {
+  {
+    Slf4jUtils.setLogLevel(ResponseNotifyClientInterceptor.LOG, Level.TRACE);
+  }
+
+  enum Type {
+    WithDeadline(GrpcTestClient::withDeadline),
+    WithTimeout(GrpcTestClient::withTimeout);
+
+    private final Function<TimeDuration, StreamObserverFactory> factory;
+
+    Type(Function<TimeDuration, StreamObserverFactory> function) {
+      this.factory = function;
+    }
+
+    StreamObserverFactory createFunction(TimeDuration timeout) {
+      return factory.apply(timeout);
+    }
+  }
+
+  @Test
+  public void testWithDeadline() throws Exception {
+    //the total sleep time is within the deadline
+    runTestTimeout(2, Type.WithDeadline);
+  }
+
+  @Test
+  public void testWithDeadlineFailure() {
+    //Expected to have DEADLINE_EXCEEDED
+    testFailureCase("total sleep time is longer than the deadline",
+        () -> runTestTimeout(5, Type.WithDeadline),
+        ExecutionException.class, StatusRuntimeException.class);
+  }
+
+  @Test
+  public void testWithTimeout() throws Exception {
+    //Each sleep time is within the timeout,
+    //Note that the total sleep time is longer than the timeout, but it does not matter.
+    runTestTimeout(5, Type.WithTimeout);
+  }
+
+  void runTestTimeout(int slow, Type type) throws Exception {
+    LOG.info("slow = {}, {}", slow, type);
+    final TimeDuration timeout = ONE_SECOND.multiply(0.5);
+    final StreamObserverFactory function = type.createFunction(timeout);
+    final InetSocketAddress address = NetUtils.createLocalServerAddress();
+
+    final List<String> messages = new ArrayList<>();
+    for (int i = 0; i < 2 * slow; i++) {
+      messages.add("m" + i);
+    }
+    try (GrpcTestServer server = new GrpcTestServer(address.getPort(), slow, timeout)) {
+      final int port = server.start();
+      try (GrpcTestClient client = new GrpcTestClient(address.getHostName(), port, function)) {
+
+        final List<CompletableFuture<String>> futures = new ArrayList<>();
+        for (String m : messages) {
+          if (type == Type.WithTimeout) {
+            timeout.sleep();
+          }
+          futures.add(client.send(m));
+        }
+
+        int i = 0;
+        for (; i < slow; i++) {
+          final String expected = i + GrpcTestServer.GreeterImpl.toReplySuffix(messages.get(i));
+          final String reply = futures.get(i).get();
+          Assert.assertEquals("expected = " + expected + " != reply = " + reply, expected, reply);
+          LOG.info("{}) passed", i);
+        }
+
+        for (; i < messages.size(); i++) {
+          final CompletableFuture<String> f = futures.get(i);
+          try {
+            final String reply = f.get();
+            Assert.fail(i + ") reply = " + reply + ", "
+                + StringUtils.completableFuture2String(f, false));
+          } catch (ExecutionException e) {
+             LOG.info("GOOD! {}) {}, {}", i, StringUtils.completableFuture2String(f, true), e);
+          }
+        }
+      }
+    }
+  }
+}