You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/10/11 01:04:40 UTC

[ratis] branch master updated: RATIS-1716. Separate ReadException and ReadIndexException for client retry (#758)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 99d8add2b RATIS-1716. Separate ReadException and ReadIndexException for client retry (#758)
99d8add2b is described below

commit 99d8add2bb0caf953b2b5d3c8468c69edf0651c0
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Oct 11 09:04:34 2022 +0800

    RATIS-1716. Separate ReadException and ReadIndexException for client retry (#758)
---
 .../apache/ratis/client/impl/ClientProtoUtils.java | 62 ++++++++++++++++++++--
 .../org/apache/ratis/protocol/RaftClientReply.java |  8 ++-
 .../protocol/exceptions/ReadIndexException.java    | 32 +++++++++++
 ratis-proto/src/main/proto/Raft.proto              |  1 +
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  5 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  5 +-
 .../org/apache/ratis/ReadOnlyRequestTests.java     | 41 ++++++++++++--
 .../ratis/ReadOnlyRequestWithLongTimeoutTests.java | 30 +++++++++++
 8 files changed, 173 insertions(+), 11 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4496d2092..bb7d25169 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -17,11 +17,54 @@
  */
 package org.apache.ratis.client.impl;
 
-import java.nio.ByteBuffer;
-import java.util.Optional;
-
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.proto.RaftProtos.AlreadyClosedExceptionProto;
+import org.apache.ratis.proto.RaftProtos.ClientMessageEntryProto;
+import org.apache.ratis.proto.RaftProtos.GroupAddRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
+import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupListReplyProto;
+import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupRemoveRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionPauseRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionResumeRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderNotReadyExceptionProto;
+import org.apache.ratis.proto.RaftProtos.NotLeaderExceptionProto;
+import org.apache.ratis.proto.RaftProtos.NotReplicatedExceptionProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.RouteProto;
+import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.proto.RaftProtos.SnapshotCreateRequestProto;
+import org.apache.ratis.proto.RaftProtos.SnapshotManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineExceptionProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
+import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.GroupListReply;
+import org.apache.ratis.protocol.GroupListRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -30,16 +73,18 @@ import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.ReflectionUtils;
 
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.ALREADYCLOSEDEXCEPTION;
@@ -49,6 +94,7 @@ import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDe
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.READEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.READINDEXEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.TRANSFERLEADERSHIPEXCEPTION;
 
@@ -309,6 +355,10 @@ public interface ClientProtoUtils {
           .map(ProtoUtils::toThrowableProto)
           .ifPresent(b::setReadException);
 
+      Optional.ofNullable(reply.getReadIndexException())
+          .map(ProtoUtils::toThrowableProto)
+          .ifPresent(b::setReadIndexException);
+
       final RaftClientReplyProto serialized = b.build();
       final RaftException e = reply.getException();
       if (e != null) {
@@ -400,6 +450,8 @@ public interface ClientProtoUtils {
       e = ProtoUtils.toThrowable(replyProto.getTransferLeadershipException(), TransferLeadershipException.class);
     } else if (replyProto.getExceptionDetailsCase().equals(READEXCEPTION)) {
       e = ProtoUtils.toThrowable(replyProto.getReadException(), ReadException.class);
+    } else if (replyProto.getExceptionDetailsCase().equals(READINDEXEXCEPTION)) {
+      e = ProtoUtils.toThrowable(replyProto.getReadIndexException(), ReadIndexException.class);
     } else {
       e = null;
     }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 6d29c452c..64d667955 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.util.JavaUtils;
@@ -170,7 +171,8 @@ public class RaftClientReply extends RaftClientMessage {
           AlreadyClosedException.class,
           NotLeaderException.class, NotReplicatedException.class,
           LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class,
-          LeaderSteppingDownException.class, TransferLeadershipException.class, ReadException.class),
+          LeaderSteppingDownException.class, TransferLeadershipException.class, ReadException.class,
+          ReadIndexException.class),
           () -> "Unexpected exception class: " + this);
     }
   }
@@ -250,6 +252,10 @@ public class RaftClientReply extends RaftClientMessage {
     return JavaUtils.cast(exception, ReadException.class);
   }
 
+  public ReadIndexException getReadIndexException() {
+    return JavaUtils.cast(exception, ReadIndexException.class);
+  }
+
   /** @return the exception, if there is any; otherwise, return null. */
   public RaftException getException() {
     return exception;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadIndexException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadIndexException.java
new file mode 100644
index 000000000..e04091dad
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadIndexException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+
+/**
+ * This indicates a retryable read exception
+ */
+public class ReadIndexException extends RaftException {
+
+  public ReadIndexException(String message) {
+    super(message);
+  }
+  public ReadIndexException(String message, Throwable throwable) {
+    super(message, throwable);
+  }
+}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 1fd2a9d1e..d90e57932 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -409,6 +409,7 @@ message RaftClientReplyProto {
     ThrowableProto leaderSteppingDownException = 9;
     ThrowableProto transferLeadershipException = 10;
     ThrowableProto readException = 11;
+    ThrowableProto readIndexException = 12;
   }
 
   uint64 logIndex = 14; // When the request is a write request and the reply is success, the log index of the transaction
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 7ea0eec67..7aba10aa5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -38,6 +38,7 @@ import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
@@ -1088,7 +1089,9 @@ class LeaderStateImpl implements LeaderState {
 
     // leader has not committed any entries in this term, reject
     if (server.getRaftLog().getTermIndex(readIndex).getTerm() != server.getState().getCurrentTerm()) {
-      return JavaUtils.completeExceptionally(new LeaderNotReadyException(server.getMemberId()));
+      return JavaUtils.completeExceptionally(new ReadIndexException(
+          "Failed to getReadIndex " + readIndex + " since the term is not yet committed.",
+          new LeaderNotReadyException(server.getMemberId())));
     }
 
     final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8c37e6273..e6fbbd0fe 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
 import org.apache.ratis.protocol.exceptions.SetConfigurationException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -947,7 +948,7 @@ class RaftServerImpl implements RaftServer.Division,
           if (reply.getServerReply().getSuccess()) {
             return reply.getReadIndex();
           } else {
-            throw new CompletionException(new ReadException(getId() +
+            throw new CompletionException(new ReadIndexException(getId() +
                 ": Failed to get read index from the leader: " + reply));
           }
         });
@@ -974,6 +975,8 @@ class RaftServerImpl implements RaftServer.Division,
       return newExceptionReply(request, (StateMachineException) e);
     } else if (e instanceof ReadException) {
       return newExceptionReply(request, (ReadException) e);
+    } else if (e instanceof ReadIndexException) {
+      return newExceptionReply(request, (ReadIndexException) e);
     } else {
       throw new CompletionException(e);
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 11a34bf52..3d17fc9b4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -24,7 +24,10 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.retry.ExceptionDependentRetry;
 import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -32,6 +35,7 @@ import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -107,7 +111,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
         CompletableFuture<RaftClientReply> result = client.async().send(incrementMessage);
         client.admin().transferLeadership(null, 200);
 
-        Assert.assertThrows(ReadException.class, () -> {
+        Assert.assertThrows(ReadIndexException.class, () -> {
           RaftClientReply timeoutReply = noRetry.io().sendReadOnly(queryMessage);
           Assert.assertNotNull(timeoutReply.getException());
           Assert.assertTrue(timeoutReply.getException() instanceof ReadException);
@@ -199,7 +203,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
          // read timeout quicker than election timeout
          leaderClient.admin().transferLeadership(null, 200);
 
-         Assert.assertThrows(ReadException.class, () -> {
+         Assert.assertThrows(ReadIndexException.class, () -> {
            followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
          });
       }
@@ -209,6 +213,37 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
     }
   }
 
+  @Test
+  public void testFollowerLinearizableReadRetryWhenLeaderDown() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadRetryWhenLeaderDown);
+  }
+
+  private void testFollowerLinearizableReadRetryWhenLeaderDown(CLUSTER cluster) throws Exception {
+    // only retry on readIndexException
+    final RetryPolicy retryPolicy = ExceptionDependentRetry
+        .newBuilder()
+        .setDefaultPolicy(RetryPolicies.noRetry())
+        .setExceptionToPolicy(ReadIndexException.class,
+            RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(100, TimeUnit.MILLISECONDS)))
+        .build();
+
+    RaftTestUtil.waitForLeader(cluster);
+
+    try (RaftClient client = cluster.createClient(cluster.getLeader().getId(), retryPolicy)) {
+      client.io().send(incrementMessage);
+
+      final RaftClientReply clientReply = client.io().sendReadOnly(queryMessage);
+      Assert.assertEquals(1, retrieve(clientReply));
+
+      // kill the leader
+      client.admin().transferLeadership(null, 200);
+
+      // readOnly will success after re-election
+      final RaftClientReply replySuccess = client.io().sendReadOnly(queryMessage);
+      Assert.assertEquals(1, retrieve(clientReply));
+    }
+  }
+
   static int retrieve(RaftClientReply reply) {
     return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
   }
@@ -253,7 +288,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
     }
 
     private void timeoutIncrement() {
-      sleepQuietly(1500);
+      sleepQuietly(2500);
       increment();
     }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
index 737b8452c..4d43eaa55 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
@@ -23,6 +23,8 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -34,6 +36,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends MiniRaftCluster>
@@ -48,9 +51,11 @@ public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends MiniRa
 
   static final String INCREMENT = "INCREMENT";
   static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
+  static final String TIMEOUT_INCREMENT = "TIMEOUT_INCREMENT";
   static final String QUERY = "QUERY";
   final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
   final Message waitAndIncrementMessage = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
+  final Message timeoutMessage = new RaftTestUtil.SimpleMessage(TIMEOUT_INCREMENT);
   final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
 
   @Before
@@ -92,4 +97,29 @@ public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends MiniRa
       Assert.assertEquals(2, ReadOnlyRequestTests.retrieve(linearizableReadValue));
     }
   }
+
+  @Test
+  public void testLinearizableReadTimeout() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl);
+  }
+
+  private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+
+    try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
+      final RaftClientReply reply = client.io().send(incrementMessage);
+      Assert.assertTrue(reply.isSuccess());
+
+      final CompletableFuture<RaftClientReply> asyncTimeoutReply = client.async().send(timeoutMessage);
+      Thread.sleep(100);
+
+      Assert.assertThrows(ReadException.class, () -> {
+        final RaftClientReply timeoutReply = client.io().sendReadOnly(queryMessage);
+        Assert.assertTrue(timeoutReply.getException().getCause() instanceof TimeoutIOException);
+      });
+
+      asyncTimeoutReply.join();
+    }
+  }
 }