You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/06/14 09:32:51 UTC
incubator-ratis git commit: RATIS-233. Throw an exception for the
delayed requests if the leader is stepping down. Contributed by Kit Hui
Repository: incubator-ratis
Updated Branches:
refs/heads/master 1b753aeba -> ccc380119
RATIS-233. Throw an exception for the delayed requests if the leader is stepping down. Contributed by Kit Hui
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ccc38011
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ccc38011
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ccc38011
Branch: refs/heads/master
Commit: ccc380119edb31631943b91b1f2f0fa70fc3bf86
Parents: 1b753ae
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Thu Jun 14 17:32:10 2018 +0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Thu Jun 14 17:32:10 2018 +0800
----------------------------------------------------------------------
.../ratis/client/impl/ClientProtoUtils.java | 17 ++++++-
.../ratis/protocol/NotReplicatedException.java | 46 +++++++++++++++++++
.../apache/ratis/protocol/RaftClientReply.java | 17 +++++--
.../ratis/protocol/RaftClientRequest.java | 3 +-
ratis-proto-shaded/src/main/proto/Raft.proto | 9 +++-
.../ratis/server/impl/PendingRequest.java | 12 +++--
.../ratis/server/impl/PendingRequests.java | 10 +++++
.../java/org/apache/ratis/RaftAsyncTests.java | 8 ++++
.../java/org/apache/ratis/RaftBasicTests.java | 47 ++++++++++++++++++++
9 files changed, 158 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4d858e4..f32d6b9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -26,6 +26,7 @@ import org.apache.ratis.util.ReflectionUtils;
import java.util.Arrays;
import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
public interface ClientProtoUtils {
@@ -163,6 +164,15 @@ public interface ClientProtoUtils {
.setStacktrace(ProtoUtils.writeObject2ByteString(t.getStackTrace()));
b.setStateMachineException(smeBuilder.build());
}
+
+ final NotReplicatedException nre = reply.getNotReplicatedException();
+ if (nre != null) {
+ final NotReplicatedExceptionProto.Builder nreBuilder = NotReplicatedExceptionProto.newBuilder()
+ .setCallId(nre.getCallId())
+ .setReplication(nre.getRequiredReplication())
+ .setLogIndex(nre.getLogIndex());
+ b.setNotReplicatedException(nreBuilder);
+ }
}
return b.build();
}
@@ -186,7 +196,7 @@ public interface ClientProtoUtils {
static RaftClientReply toRaftClientReply(
RaftClientReplyProto replyProto) {
final RaftRpcReplyProto rp = replyProto.getRpcReply();
- RaftException e = null;
+ final RaftException e;
if (replyProto.getExceptionDetailsCase().equals(NOTLEADEREXCEPTION)) {
NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException();
final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ?
@@ -195,11 +205,16 @@ public interface ClientProtoUtils {
nleProto.getPeersInConfList());
e = new NotLeaderException(RaftPeerId.valueOf(rp.getReplyId()),
suggestedLeader, peers);
+ } else if (replyProto.getExceptionDetailsCase() == NOTREPLICATEDEXCEPTION) {
+ final NotReplicatedExceptionProto nre = replyProto.getNotReplicatedException();
+ e = new NotReplicatedException(nre.getCallId(), nre.getReplication(), nre.getLogIndex());
} else if (replyProto.getExceptionDetailsCase().equals(STATEMACHINEEXCEPTION)) {
StateMachineExceptionProto smeProto = replyProto.getStateMachineException();
e = wrapStateMachineException(RaftPeerId.valueOf(rp.getReplyId()),
smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
smeProto.getStacktrace());
+ } else {
+ e = null;
}
ClientId clientId = ClientId.valueOf(rp.getRequestorId());
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
new file mode 100644
index 0000000..67bda34
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+
+public class NotReplicatedException extends RaftException {
+ private final long callId;
+ private final ReplicationLevel requiredReplication;
+ private final long logIndex;
+
+ public NotReplicatedException(long callId, ReplicationLevel requiredReplication, long logIndex) {
+ super("Request with call Id " + callId + " is committed with log index " + logIndex
+ + " but not yet replicated to " + requiredReplication);
+ this.callId = callId;
+ this.requiredReplication = requiredReplication;
+ this.logIndex = logIndex;
+ }
+
+ public long getCallId() {
+ return callId;
+ }
+
+ public ReplicationLevel getRequiredReplication() {
+ return requiredReplication;
+ }
+
+ public long getLogIndex() {
+ return logIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 8254ab4..ba3cdc7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -55,16 +55,15 @@ public class RaftClientReply extends RaftClientMessage {
this.callId = callId;
this.message = message;
this.exception = exception;
+ this.commitInfos = commitInfos != null? commitInfos: Collections.emptyList();
if (exception != null) {
Preconditions.assertTrue(!success,
() -> "Inconsistent parameters: success && exception != null: " + this);
- Preconditions.assertTrue(
- ReflectionUtils.isInstance(exception, NotLeaderException.class, StateMachineException.class),
+ Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
+ NotLeaderException.class, NotReplicatedException.class, StateMachineException.class),
() -> "Unexpected exception class: " + this);
}
-
- this.commitInfos = commitInfos != null? commitInfos: Collections.emptyList();
}
public RaftClientReply(RaftClientRequest request, RaftException exception, Collection<CommitInfoProto> commitInfos) {
@@ -81,6 +80,11 @@ public class RaftClientReply extends RaftClientMessage {
request.getCallId(), true, message, null, commitInfos);
}
+ public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) {
+ this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(),
+ reply.getCallId(), false, reply.getMessage(), nre, reply.getCommitInfos());
+ }
+
/**
* Get the commit information for the entire group.
* The commit information may be unavailable for exception reply.
@@ -120,6 +124,11 @@ public class RaftClientReply extends RaftClientMessage {
return JavaUtils.cast(exception, NotLeaderException.class);
}
+ /** If this reply has {@link NotReplicatedException}, return it; otherwise return null. */
+ public NotReplicatedException getNotReplicatedException() {
+ return JavaUtils.cast(exception, NotReplicatedException.class);
+ }
+
/** If this reply has {@link StateMachineException}, return it; otherwise return null. */
public StateMachineException getStateMachineException() {
return JavaUtils.cast(exception, StateMachineException.class);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 232c51d..34c96f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -122,7 +122,8 @@ public class RaftClientRequest extends RaftClientMessage {
public String toString() {
switch (typeCase) {
case WRITE:
- return "RW";
+ final ReplicationLevel replication = write.getReplication();
+ return "RW" + (replication == ReplicationLevel.MAJORITY? "": "-" + replication);
case READ:
return "RO";
case STALEREAD:
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 3f0baf8..303ec2b 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -203,6 +203,12 @@ message NotLeaderExceptionProto {
repeated RaftPeerProto peersInConf = 2;
}
+message NotReplicatedExceptionProto {
+ uint64 callId = 1;
+ ReplicationLevel replication = 2;
+ uint64 logIndex = 3;
+}
+
message StateMachineExceptionProto {
string exceptionClassName = 1;
string errorMsg = 2;
@@ -215,7 +221,8 @@ message RaftClientReplyProto {
oneof ExceptionDetails {
NotLeaderExceptionProto notLeaderException = 3;
- StateMachineExceptionProto stateMachineException = 4;
+ NotReplicatedExceptionProto notReplicatedException = 4;
+ StateMachineExceptionProto stateMachineException = 5;
}
repeated CommitInfoProto commitInfos = 15;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 10cd95f..4a72197 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -17,10 +17,8 @@
*/
package org.apache.ratis.server.impl;
-import org.apache.ratis.protocol.NotLeaderException;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.Preconditions;
@@ -86,6 +84,12 @@ public class PendingRequest implements Comparable<PendingRequest> {
setReply(delayed);
}
+ synchronized void failDelayedReply() {
+ final RaftClientRequest.Type type = request.getType();
+ final ReplicationLevel replication = type.getWrite().getReplication();
+ setReply(new RaftClientReply(delayed, new NotReplicatedException(request.getCallId(), replication, index)));
+ }
+
TransactionContext setNotLeaderException(NotLeaderException nle) {
setReply(new RaftClientReply(getRequest(), nle, null));
return getEntry();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index c02d7c3..2cf1271 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -116,9 +116,18 @@ class PendingRequests {
polled = q.poll();
Preconditions.assertTrue(polled == peeked);
}
+ LOG.debug("{}: complete delay request {}", name, polled);
polled.completeDelayedReply();
}
}
+
+ void failReplies() {
+ synchronized (q) {
+ for(; !q.isEmpty();) {
+ q.poll().failDelayedReply();
+ }
+ }
+ }
}
private PendingRequest pendingSetConf;
@@ -215,6 +224,7 @@ class PendingRequests {
if (pendingSetConf != null) {
pendingSetConf.setNotLeaderException(nle);
}
+ delayedReplies.failReplies();
}
void checkDelayedReplies(long allAckedIndex) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 53e0f1f..b297a27 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -48,6 +48,7 @@ import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.ratis.RaftBasicTests.runTestDelayRequestIfLeaderStepDown;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest
@@ -309,4 +310,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
//reset for the other tests
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
}
+
+ @Test
+ public void testAsyncDelayRequestIfLeaderStepDown() throws Exception {
+ final CLUSTER cluster = newCluster(5);
+ cluster.start();
+ runTestDelayRequestIfLeaderStepDown(true, cluster, LOG);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 207a458..8c744ab 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.NotReplicatedException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
@@ -50,6 +51,7 @@ import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -451,4 +453,49 @@ public abstract class RaftBasicTests extends BaseTest {
Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
}
}
+
+ @Test
+ public void testDelayRequestIfLeaderStepDown() throws Exception {
+ runTestDelayRequestIfLeaderStepDown(false, getCluster(), LOG);
+ }
+
+ static void runTestDelayRequestIfLeaderStepDown(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
+ boolean skipfirstserver = false;
+ for (RaftServer s : cluster.getServers()) {
+ if (!skipfirstserver) {
+ skipfirstserver = true;
+ cluster.killServer(s.getId());
+ continue;
+ }
+ cluster.restartServer(s.getId(), false);
+ }
+ final RaftServerImpl leader = waitForLeader(cluster);
+ LOG.info("leader: " + leader.getId() + ", " + cluster.printServers());
+
+ final SimpleMessage message = SimpleMessage.create(1)[0];
+ try (final RaftClient client = cluster.createClientWithLeader()) {
+ final RaftClientReply reply;
+ if (async) {
+ final CompletableFuture<RaftClientReply> f = client.sendAsync(message, ReplicationLevel.ALL);
+ Thread.sleep(1000);
+ RaftTestUtil.changeLeader(cluster, leader.getId());
+
+ reply = f.get();
+ } else {
+ new Thread(() -> {
+ try {
+ Thread.sleep(1000);
+ RaftTestUtil.changeLeader(cluster, leader.getId());
+ } catch (Exception e) {
+ LOG.warn("changeLeader", e);
+ }
+ }).start();
+
+ reply = client.send(message, ReplicationLevel.ALL);
+ }
+ throw reply.getNotReplicatedException();
+ } catch (NotReplicatedException e) {
+ LOG.info("Expected", e);
+ }
+ }
}