You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/03/16 20:09:54 UTC
incubator-ratis git commit: RATIS-38. RaftClient should not retry on
StateMachineException.
Repository: incubator-ratis
Updated Branches:
refs/heads/master f1716ac43 -> a25143eed
RATIS-38. RaftClient should not retry on StateMachineException.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a25143ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a25143ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a25143ee
Branch: refs/heads/master
Commit: a25143eedb84a3da5ffba5d475314eca41a7ae48
Parents: f1716ac
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Mar 16 13:09:48 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Mar 16 13:09:48 2017 -0700
----------------------------------------------------------------------
.../ratis/client/impl/ClientProtoUtils.java | 69 ++++++++++++++++----
.../ratis/client/impl/RaftClientImpl.java | 22 +++++--
.../apache/ratis/protocol/RaftClientReply.java | 32 ++++++---
.../ratis/protocol/StateMachineException.java | 2 +-
.../java/org/apache/ratis/util/RaftUtils.java | 11 ++++
.../java/org/apache/ratis/util/StringUtils.java | 10 +++
.../TestRaftStateMachineException.java | 2 +-
.../org/apache/ratis/grpc/RaftGrpcUtil.java | 24 +------
.../TestRaftReconfigurationWithHadoopRpc.java | 9 +++
ratis-proto-shaded/src/main/proto/Raft.proto | 20 ++++--
.../apache/ratis/server/impl/LeaderState.java | 5 +-
.../apache/ratis/server/impl/LogAppender.java | 2 +-
.../ratis/server/impl/PendingRequests.java | 11 +++-
.../ratis/server/impl/RaftServerImpl.java | 9 +--
.../ratis/server/impl/StateMachineUpdater.java | 4 +-
15 files changed, 164 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 218d761..ddecad6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -21,9 +21,13 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.protocol.*;
import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.RaftUtils;
import java.util.Arrays;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
+
public class ClientProtoUtils {
public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
byte[] requestorId, byte[] replyId, long callId, boolean success) {
@@ -83,14 +87,25 @@ public class ClientProtoUtils {
b.setMessage(toClientMessageEntryProto(reply.getMessage()));
}
if (reply.isNotLeader()) {
- b.setIsNotLeader(true);
- final RaftPeer suggestedLeader = reply.getNotLeaderException()
- .getSuggestedLeader();
+ NotLeaderException nle = reply.getNotLeaderException();
+ NotLeaderExceptionProto.Builder nleBuilder =
+ NotLeaderExceptionProto.newBuilder();
+ final RaftPeer suggestedLeader = nle.getSuggestedLeader();
if (suggestedLeader != null) {
- b.setSuggestedLeader(ProtoUtils.toRaftPeerProto(suggestedLeader));
+ nleBuilder.setSuggestedLeader(ProtoUtils.toRaftPeerProto(suggestedLeader));
}
- b.addAllPeersInConf(ProtoUtils.toRaftPeerProtos(
- Arrays.asList(reply.getNotLeaderException().getPeers())));
+ nleBuilder.addAllPeersInConf(
+ ProtoUtils.toRaftPeerProtos(Arrays.asList(nle.getPeers())));
+ b.setNotLeaderException(nleBuilder.build());
+ } else if (reply.hasStateMachineException()) {
+ StateMachineException sme = reply.getStateMachineException();
+ StateMachineExceptionProto.Builder smeBuilder =
+ StateMachineExceptionProto.newBuilder();
+ final Throwable t = sme.getCause() != null ? sme.getCause() : sme;
+ smeBuilder.setExceptionClassName(t.getClass().getName())
+ .setErrorMsg(t.getMessage())
+ .setStacktrace(ProtoUtils.toByteString(t.getStackTrace()));
+ b.setStateMachineException(smeBuilder.build());
}
}
return b.build();
@@ -99,25 +114,53 @@ public class ClientProtoUtils {
public static RaftClientReply toRaftClientReply(
RaftClientReplyProto replyProto) {
final RaftRpcReplyProto rp = replyProto.getRpcReply();
- NotLeaderException e = null;
- if (replyProto.getIsNotLeader()) {
- final RaftPeer suggestedLeader = replyProto.hasSuggestedLeader() ?
- ProtoUtils.toRaftPeer(replyProto.getSuggestedLeader()) : null;
+ RaftException e = null;
+ if (replyProto.getExceptionDetailsCase().equals(NOTLEADEREXCEPTION)) {
+ NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException();
+ final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ?
+ ProtoUtils.toRaftPeer(nleProto.getSuggestedLeader()) : null;
final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(
- replyProto.getPeersInConfList());
+ nleProto.getPeersInConfList());
e = new NotLeaderException(new RaftPeerId(rp.getReplyId()),
suggestedLeader, peers);
+ } else if (replyProto.getExceptionDetailsCase().equals(STATEMACHINEEXCEPTION)) {
+ StateMachineExceptionProto smeProto = replyProto.getStateMachineException();
+ e = wrapStateMachineException(rp.getReplyId().toStringUtf8(),
+ smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
+ smeProto.getStacktrace());
}
return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()),
new RaftPeerId(rp.getReplyId()),
rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e);
}
- public static Message toMessage(final ClientMessageEntryProto p) {
+ private static StateMachineException wrapStateMachineException(
+ String serverId, String className, String errorMsg,
+ ByteString stackTraceBytes) {
+ StateMachineException sme;
+ if (className == null) {
+ sme = new StateMachineException(errorMsg);
+ } else {
+ try {
+ Class<?> clazz = Class.forName(className);
+ final Exception e = RaftUtils.instantiateException(
+ clazz.asSubclass(Exception.class), errorMsg, null);
+ sme = new StateMachineException(serverId, e);
+ } catch (Exception e) {
+ sme = new StateMachineException(className + ": " + errorMsg);
+ }
+ }
+ StackTraceElement[] stacktrace =
+ (StackTraceElement[]) ProtoUtils.toObject(stackTraceBytes);
+ sme.setStackTrace(stacktrace);
+ return sme;
+ }
+
+ private static Message toMessage(final ClientMessageEntryProto p) {
return p::getContent;
}
- public static ClientMessageEntryProto toClientMessageEntryProto(Message message) {
+ private static ClientMessageEntryProto toClientMessageEntryProto(Message message) {
return ClientMessageEntryProto.newBuilder()
.setContent(message.getContent()).build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 4e5db47..eae42a5 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -25,10 +25,12 @@ import org.apache.ratis.util.RaftUtils;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
@@ -82,6 +84,9 @@ final class RaftClientImpl implements RaftClient {
public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf)
throws IOException {
final long callId = nextCallId();
+ // also refresh the rpc proxies for these peers
+ clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains)
+ .collect(Collectors.toCollection(ArrayList::new)));
return sendRequestWithRetry(() -> new SetConfigurationRequest(
clientId, leaderId, callId, peersInNewConf));
}
@@ -111,19 +116,21 @@ final class RaftClientImpl implements RaftClient {
private RaftClientReply sendRequest(RaftClientRequest request)
throws StateMachineException {
+ RaftClientReply reply = null;
try {
- RaftClientReply reply = clientRpc.sendRequest(request);
+ reply = clientRpc.sendRequest(request);
+ } catch (IOException ioe) {
+ handleIOException(request, ioe, null);
+ }
+ if (reply != null) {
if (reply.isNotLeader()) {
handleNotLeaderException(request, reply.getNotLeaderException());
return null;
+ } else if (reply.hasStateMachineException()) {
+ throw reply.getStateMachineException();
} else {
return reply;
}
- } catch (StateMachineException e) {
- throw e;
- } catch (IOException ioe) {
- // TODO different retry policies for different exceptions
- handleIOException(request, ioe, null);
}
return null;
}
@@ -147,7 +154,8 @@ final class RaftClientImpl implements RaftClient {
private void handleIOException(RaftClientRequest request, IOException ioe,
RaftPeerId newLeader) {
- LOG.debug("{}: Failed with {}", clientId, ioe);
+ LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId,
+ newLeader, ioe);
final RaftPeerId oldLeader = request.getServerId();
if (newLeader == null && oldLeader.equals(leaderId)) {
newLeader = RaftUtils.next(oldLeader, RaftUtils.as(peers, RaftPeer::getId));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 4dd2943..7179505 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -24,23 +24,27 @@ public class RaftClientReply extends RaftClientMessage {
private final boolean success;
private final long callId;
- /** non-null if the server is not leader */
- private final NotLeaderException notLeaderException;
+ /**
+ * We mainly track two types of exceptions here:
+ * 1. NotLeaderException if the server is not leader
+ * 2. StateMachineException if the server's state machine returns an exception
+ */
+ private final RaftException exception;
private final Message message;
public RaftClientReply(ClientId clientId, RaftPeerId serverId, long callId,
- boolean success, Message message, NotLeaderException notLeaderException) {
+ boolean success, Message message, RaftException exception) {
super(clientId, serverId);
this.success = success;
this.callId = callId;
this.message = message;
- this.notLeaderException = notLeaderException;
+ this.exception = exception;
}
public RaftClientReply(RaftClientRequest request,
- NotLeaderException notLeaderException) {
+ RaftException exception) {
this(request.getClientId(), request.getServerId(), request.getCallId(),
- false, null, notLeaderException);
+ false, null, exception);
}
public RaftClientReply(RaftClientRequest request, Message message) {
@@ -71,11 +75,21 @@ public class RaftClientReply extends RaftClientMessage {
return message;
}
+ public boolean isNotLeader() {
+ return exception instanceof NotLeaderException;
+ }
+
public NotLeaderException getNotLeaderException() {
- return notLeaderException;
+ assert isNotLeader();
+ return (NotLeaderException) exception;
}
- public boolean isNotLeader() {
- return notLeaderException != null;
+ public StateMachineException getStateMachineException() {
+ assert hasStateMachineException();
+ return (StateMachineException) exception;
+ }
+
+ public boolean hasStateMachineException() {
+ return exception instanceof StateMachineException;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index 099133d..68d808b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -18,7 +18,7 @@
package org.apache.ratis.protocol;
public class StateMachineException extends RaftException {
- public StateMachineException(String serverId, Exception cause) {
+ public StateMachineException(String serverId, Throwable cause) {
super(cause.getClass().getName() + " from Server " + serverId, cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
index 64c7a15..5cc93b3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
@@ -380,4 +380,15 @@ public abstract class RaftUtils {
throw new IllegalStateException(String.valueOf(message.get()));
}
}
+
+ public static Exception instantiateException(Class<? extends Exception> cls,
+ String message, Exception from) throws Exception {
+ Constructor<? extends Exception> cn = cls.getConstructor(String.class);
+ cn.setAccessible(true);
+ Exception ex = cn.newInstance(message);
+ if (from != null) {
+ ex.initCause(from);
+ }
+ return ex;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index f17ee93..46cbe2b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.Locale;
public class StringUtils {
@@ -83,4 +85,12 @@ public class StringUtils {
return defaultValue;
}
}
+
+ public static String stringifyException(Throwable e) {
+ StringWriter stm = new StringWriter();
+ PrintWriter wrt = new PrintWriter(stm);
+ e.printStackTrace(wrt);
+ wrt.close();
+ return stm.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index e38e245..447f2ea 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -79,7 +79,7 @@ public class TestRaftStateMachineException {
fail("Exception expected");
} catch (StateMachineException e) {
e.printStackTrace();
- Assert.assertTrue(e.getMessage().contains("Fake Exception"));
+ Assert.assertTrue(e.getCause().getMessage().contains("Fake Exception"));
}
cluster.shutdown();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
index 52ed851..fb3cc11 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
@@ -21,29 +21,19 @@ import org.apache.ratis.shaded.io.grpc.Metadata;
import org.apache.ratis.shaded.io.grpc.Status;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.util.RaftUtils;
+import org.apache.ratis.util.StringUtils;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
public class RaftGrpcUtil {
public static final Metadata.Key<String> EXCEPTION_TYPE_KEY =
Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
- public static String stringifyException(Throwable e) {
- StringWriter stm = new StringWriter();
- PrintWriter wrt = new PrintWriter(stm);
- e.printStackTrace(wrt);
- wrt.close();
- return stm.toString();
- }
-
public static StatusRuntimeException wrapException(Throwable t) {
Metadata trailers = new Metadata();
trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
return new StatusRuntimeException(
- Status.INTERNAL.withDescription(RaftGrpcUtil.stringifyException(t)),
+ Status.INTERNAL.withDescription(StringUtils.stringifyException(t)),
trailers);
}
@@ -55,7 +45,7 @@ public class RaftGrpcUtil {
if (className != null) {
try {
Class<?> clazz = Class.forName(className);
- final Exception unwrapped = instantiateException(
+ final Exception unwrapped = RaftUtils.instantiateException(
clazz.asSubclass(Exception.class), status.getDescription(), se);
return RaftUtils.asIOException(unwrapped);
} catch (Exception e) {
@@ -76,12 +66,4 @@ public class RaftGrpcUtil {
return e;
}
- private static Exception instantiateException(Class<? extends Exception> cls,
- String message, Exception from) throws Exception {
- Constructor<? extends Exception> cn = cls.getConstructor(String.class);
- cn.setAccessible(true);
- Exception ex = cn.newInstance(message);
- ex.initCause(from);
- return ex;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
index 2d15cea..7a36fa1 100644
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
@@ -17,9 +17,14 @@
*/
package org.apache.ratis.hadooprpc;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.log4j.Level;
import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.RaftClient;
import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
+import org.apache.ratis.util.RaftUtils;
import java.io.IOException;
@@ -28,6 +33,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONN
public class TestRaftReconfigurationWithHadoopRpc
extends RaftReconfigurationBaseTest {
+ static {
+ ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ERROR);
+ }
+
@Override
public MiniRaftCluster getCluster(int peerNum) throws IOException {
final Configuration hadoopConf = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 8c334dd..f8dcf62 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -154,13 +154,25 @@ message RaftClientRequestProto {
bool readOnly = 3;
}
+message NotLeaderExceptionProto {
+ RaftPeerProto suggestedLeader = 1;
+ repeated RaftPeerProto peersInConf = 2;
+}
+
+message StateMachineExceptionProto {
+ string exceptionClassName = 1;
+ string errorMsg = 2;
+ bytes stacktrace = 3;
+}
+
message RaftClientReplyProto {
RaftRpcReplyProto rpcReply = 1;
ClientMessageEntryProto message = 2;
- // the following 3 fields are used to indicate the server is not leader
- bool isNotLeader = 3;
- RaftPeerProto suggestedLeader = 4;
- repeated RaftPeerProto peersInConf = 5;
+
+ oneof ExceptionDetails {
+ NotLeaderExceptionProto notLeaderException = 3;
+ StateMachineExceptionProto stateMachineException = 4;
+ }
}
// setConfiguration request
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index c6b15e6..f6c9ade 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -516,8 +516,9 @@ public class LeaderState {
return pending;
}
- void replyPendingRequest(long logIndex, CompletableFuture<Message> message) {
- pendingRequests.replyPendingRequest(logIndex, message);
+ void replyPendingRequest(long logIndex,
+ CompletableFuture<Message> stateMachineFuture) {
+ pendingRequests.replyPendingRequest(logIndex, stateMachineFuture);
}
TransactionContext getTransactionContext(long index) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index b5bb4b9..356ed4e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -206,7 +206,7 @@ public class LogAppender extends Daemon {
} catch (InterruptedIOException iioe) {
throw iioe;
} catch (IOException ioe) {
- LOG.debug(this + ": failed to send appendEntries; retry " + retry++, ioe);
+ LOG.trace(this + ": failed to send appendEntries; retry " + retry++, ioe);
}
if (isAppenderRunning()) {
leaderState.getSyncInterval().sleep();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index ff407e4..d4b74f2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -88,16 +88,21 @@ class PendingRequests {
return pendingRequest != null ? pendingRequest.getEntry() : null;
}
- void replyPendingRequest(long index, CompletableFuture<Message> messageFuture) {
+ void replyPendingRequest(long index,
+ CompletableFuture<Message> stateMachineFuture) {
final PendingRequest pending = pendingRequests.get(index);
if (pending != null) {
RaftUtils.assertTrue(pending.getIndex() == index);
- messageFuture.whenComplete((reply, exception) -> {
+ stateMachineFuture.whenComplete((reply, exception) -> {
if (exception == null) {
pending.setSuccessReply(reply);
} else {
- pending.setException(exception);
+ // the exception is coming from the state machine. wrap it into the
+ // reply as a StateMachineException
+ final StateMachineException e = new StateMachineException(
+ server.getId().toString(), exception);
+ pending.setReply(new RaftClientReply(pending.getRequest(), e));
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 897b2a8..ee74dc8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -434,8 +434,9 @@ public class RaftServerImpl implements RaftServer {
if (cause == null) {
throw new IOException(e);
}
- if (cause instanceof NotLeaderException) {
- return new RaftClientReply(request, (NotLeaderException)cause);
+ if (cause instanceof NotLeaderException ||
+ cause instanceof StateMachineException) {
+ return new RaftClientReply(request, (RaftException) cause);
} else {
throw RaftUtils.asIOException(cause);
}
@@ -797,9 +798,9 @@ public class RaftServerImpl implements RaftServer {
}
synchronized void replyPendingRequest(long logIndex,
- CompletableFuture<Message> message) {
+ CompletableFuture<Message> stateMachineFuture) {
if (isLeader() && leaderState != null) { // is leader and is running
- leaderState.replyPendingRequest(logIndex, message);
+ leaderState.replyPendingRequest(logIndex, stateMachineFuture);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a25143ee/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 50aeae8..b4fc705 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -162,9 +162,9 @@ class StateMachineUpdater implements Runnable {
trx = stateMachine.applyTransactionSerial(trx);
// TODO: This step can be parallelized
- CompletableFuture<Message> messageFuture =
+ CompletableFuture<Message> stateMachineFuture =
stateMachine.applyTransaction(trx);
- server.replyPendingRequest(next.getIndex(), messageFuture);
+ server.replyPendingRequest(next.getIndex(), stateMachineFuture);
}
lastAppliedIndex++;
} else {