You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/10/11 01:04:40 UTC
[ratis] branch master updated: RATIS-1716. Separate ReadException and ReadIndexException for client retry (#758)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 99d8add2b RATIS-1716. Separate ReadException and ReadIndexException for client retry (#758)
99d8add2b is described below
commit 99d8add2bb0caf953b2b5d3c8468c69edf0651c0
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Oct 11 09:04:34 2022 +0800
RATIS-1716. Separate ReadException and ReadIndexException for client retry (#758)
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 62 ++++++++++++++++++++--
.../org/apache/ratis/protocol/RaftClientReply.java | 8 ++-
.../protocol/exceptions/ReadIndexException.java | 32 +++++++++++
ratis-proto/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/impl/LeaderStateImpl.java | 5 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 5 +-
.../org/apache/ratis/ReadOnlyRequestTests.java | 41 ++++++++++++--
.../ratis/ReadOnlyRequestWithLongTimeoutTests.java | 30 +++++++++++
8 files changed, 173 insertions(+), 11 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4496d2092..bb7d25169 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -17,11 +17,54 @@
*/
package org.apache.ratis.client.impl;
-import java.nio.ByteBuffer;
-import java.util.Optional;
-
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.proto.RaftProtos.AlreadyClosedExceptionProto;
+import org.apache.ratis.proto.RaftProtos.ClientMessageEntryProto;
+import org.apache.ratis.proto.RaftProtos.GroupAddRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupInfoReplyProto;
+import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupListReplyProto;
+import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.GroupRemoveRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionPauseRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderElectionResumeRequestProto;
+import org.apache.ratis.proto.RaftProtos.LeaderNotReadyExceptionProto;
+import org.apache.ratis.proto.RaftProtos.NotLeaderExceptionProto;
+import org.apache.ratis.proto.RaftProtos.NotReplicatedExceptionProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.RouteProto;
+import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.proto.RaftProtos.SnapshotCreateRequestProto;
+import org.apache.ratis.proto.RaftProtos.SnapshotManagementRequestProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineExceptionProto;
+import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
+import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.GroupListReply;
+import org.apache.ratis.protocol.GroupListRequest;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.LeaderElectionManagementRequest;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RoutingTable;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.SnapshotManagementRequest;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -30,16 +73,18 @@ import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReflectionUtils;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.ALREADYCLOSEDEXCEPTION;
@@ -49,6 +94,7 @@ import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDe
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.READEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.READINDEXEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.TRANSFERLEADERSHIPEXCEPTION;
@@ -309,6 +355,10 @@ public interface ClientProtoUtils {
.map(ProtoUtils::toThrowableProto)
.ifPresent(b::setReadException);
+ Optional.ofNullable(reply.getReadIndexException())
+ .map(ProtoUtils::toThrowableProto)
+ .ifPresent(b::setReadIndexException);
+
final RaftClientReplyProto serialized = b.build();
final RaftException e = reply.getException();
if (e != null) {
@@ -400,6 +450,8 @@ public interface ClientProtoUtils {
e = ProtoUtils.toThrowable(replyProto.getTransferLeadershipException(), TransferLeadershipException.class);
} else if (replyProto.getExceptionDetailsCase().equals(READEXCEPTION)) {
e = ProtoUtils.toThrowable(replyProto.getReadException(), ReadException.class);
+ } else if (replyProto.getExceptionDetailsCase().equals(READINDEXEXCEPTION)) {
+ e = ProtoUtils.toThrowable(replyProto.getReadIndexException(), ReadIndexException.class);
} else {
e = null;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 6d29c452c..64d667955 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.util.JavaUtils;
@@ -170,7 +171,8 @@ public class RaftClientReply extends RaftClientMessage {
AlreadyClosedException.class,
NotLeaderException.class, NotReplicatedException.class,
LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class,
- LeaderSteppingDownException.class, TransferLeadershipException.class, ReadException.class),
+ LeaderSteppingDownException.class, TransferLeadershipException.class, ReadException.class,
+ ReadIndexException.class),
() -> "Unexpected exception class: " + this);
}
}
@@ -250,6 +252,10 @@ public class RaftClientReply extends RaftClientMessage {
return JavaUtils.cast(exception, ReadException.class);
}
+ public ReadIndexException getReadIndexException() {
+ return JavaUtils.cast(exception, ReadIndexException.class);
+ }
+
/** @return the exception, if there is any; otherwise, return null. */
public RaftException getException() {
return exception;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadIndexException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadIndexException.java
new file mode 100644
index 000000000..e04091dad
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadIndexException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+
+/**
+ * This indicates a retryable read exception
+ */
+public class ReadIndexException extends RaftException {
+
+ public ReadIndexException(String message) {
+ super(message);
+ }
+ public ReadIndexException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 1fd2a9d1e..d90e57932 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -409,6 +409,7 @@ message RaftClientReplyProto {
ThrowableProto leaderSteppingDownException = 9;
ThrowableProto transferLeadershipException = 10;
ThrowableProto readException = 11;
+ ThrowableProto readIndexException = 12;
}
uint64 logIndex = 14; // When the request is a write request and the reply is success, the log index of the transaction
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 7ea0eec67..7aba10aa5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -38,6 +38,7 @@ import org.apache.ratis.protocol.TransferLeadershipRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
@@ -1088,7 +1089,9 @@ class LeaderStateImpl implements LeaderState {
// leader has not committed any entries in this term, reject
if (server.getRaftLog().getTermIndex(readIndex).getTerm() != server.getState().getCurrentTerm()) {
- return JavaUtils.completeExceptionally(new LeaderNotReadyException(server.getMemberId()));
+ return JavaUtils.completeExceptionally(new ReadIndexException(
+ "Failed to getReadIndex " + readIndex + " since the term is not yet committed.",
+ new LeaderNotReadyException(server.getMemberId())));
}
final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8c37e6273..e6fbbd0fe 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.SetConfigurationException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -947,7 +948,7 @@ class RaftServerImpl implements RaftServer.Division,
if (reply.getServerReply().getSuccess()) {
return reply.getReadIndex();
} else {
- throw new CompletionException(new ReadException(getId() +
+ throw new CompletionException(new ReadIndexException(getId() +
": Failed to get read index from the leader: " + reply));
}
});
@@ -974,6 +975,8 @@ class RaftServerImpl implements RaftServer.Division,
return newExceptionReply(request, (StateMachineException) e);
} else if (e instanceof ReadException) {
return newExceptionReply(request, (ReadException) e);
+ } else if (e instanceof ReadIndexException) {
+ return newExceptionReply(request, (ReadIndexException) e);
} else {
throw new CompletionException(e);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
index 11a34bf52..3d17fc9b4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -24,7 +24,10 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.retry.ExceptionDependentRetry;
import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
@@ -32,6 +35,7 @@ import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -107,7 +111,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
CompletableFuture<RaftClientReply> result = client.async().send(incrementMessage);
client.admin().transferLeadership(null, 200);
- Assert.assertThrows(ReadException.class, () -> {
+ Assert.assertThrows(ReadIndexException.class, () -> {
RaftClientReply timeoutReply = noRetry.io().sendReadOnly(queryMessage);
Assert.assertNotNull(timeoutReply.getException());
Assert.assertTrue(timeoutReply.getException() instanceof ReadException);
@@ -199,7 +203,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
// read timeout quicker than election timeout
leaderClient.admin().transferLeadership(null, 200);
- Assert.assertThrows(ReadException.class, () -> {
+ Assert.assertThrows(ReadIndexException.class, () -> {
followerClient1.io().sendReadOnly(queryMessage, followers.get(0).getId());
});
}
@@ -209,6 +213,37 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
}
}
+ @Test
+ public void testFollowerLinearizableReadRetryWhenLeaderDown() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadRetryWhenLeaderDown);
+ }
+
+ private void testFollowerLinearizableReadRetryWhenLeaderDown(CLUSTER cluster) throws Exception {
+ // only retry on readIndexException
+ final RetryPolicy retryPolicy = ExceptionDependentRetry
+ .newBuilder()
+ .setDefaultPolicy(RetryPolicies.noRetry())
+ .setExceptionToPolicy(ReadIndexException.class,
+ RetryPolicies.retryForeverWithSleep(TimeDuration.valueOf(100, TimeUnit.MILLISECONDS)))
+ .build();
+
+ RaftTestUtil.waitForLeader(cluster);
+
+ try (RaftClient client = cluster.createClient(cluster.getLeader().getId(), retryPolicy)) {
+ client.io().send(incrementMessage);
+
+ final RaftClientReply clientReply = client.io().sendReadOnly(queryMessage);
+ Assert.assertEquals(1, retrieve(clientReply));
+
+ // kill the leader
+ client.admin().transferLeadership(null, 200);
+
+ // readOnly will success after re-election
+ final RaftClientReply replySuccess = client.io().sendReadOnly(queryMessage);
+ Assert.assertEquals(1, retrieve(clientReply));
+ }
+ }
+
static int retrieve(RaftClientReply reply) {
return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
}
@@ -253,7 +288,7 @@ public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
}
private void timeoutIncrement() {
- sleepQuietly(1500);
+ sleepQuietly(2500);
increment();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
index 737b8452c..4d43eaa55 100644
--- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestWithLongTimeoutTests.java
@@ -23,6 +23,8 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -34,6 +36,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends MiniRaftCluster>
@@ -48,9 +51,11 @@ public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends MiniRa
static final String INCREMENT = "INCREMENT";
static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
+ static final String TIMEOUT_INCREMENT = "TIMEOUT_INCREMENT";
static final String QUERY = "QUERY";
final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
final Message waitAndIncrementMessage = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
+ final Message timeoutMessage = new RaftTestUtil.SimpleMessage(TIMEOUT_INCREMENT);
final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
@Before
@@ -92,4 +97,29 @@ public abstract class ReadOnlyRequestWithLongTimeoutTests<CLUSTER extends MiniRa
Assert.assertEquals(2, ReadOnlyRequestTests.retrieve(linearizableReadValue));
}
}
+
+ @Test
+ public void testLinearizableReadTimeout() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl);
+ }
+
+ private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
+ final RaftClientReply reply = client.io().send(incrementMessage);
+ Assert.assertTrue(reply.isSuccess());
+
+ final CompletableFuture<RaftClientReply> asyncTimeoutReply = client.async().send(timeoutMessage);
+ Thread.sleep(100);
+
+ Assert.assertThrows(ReadException.class, () -> {
+ final RaftClientReply timeoutReply = client.io().sendReadOnly(queryMessage);
+ Assert.assertTrue(timeoutReply.getException().getCause() instanceof TimeoutIOException);
+ });
+
+ asyncTimeoutReply.join();
+ }
+ }
}