You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/05 00:56:17 UTC
incubator-ratis git commit: RATIS-162. Clean up reply exception
handling code for RATIS-140.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 1a74e13ea -> 8f0a3ed50
RATIS-162. Clean up reply exception handling code for RATIS-140.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8f0a3ed5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8f0a3ed5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8f0a3ed5
Branch: refs/heads/master
Commit: 8f0a3ed50aa7a347558551ab5ce3a8bc2e8e92a0
Parents: 1a74e13
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Dec 5 08:55:56 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Dec 5 08:55:56 2017 +0800
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 8 +-
.../org/apache/ratis/client/RaftClientRpc.java | 2 +-
.../ratis/client/impl/ClientImplUtils.java | 5 +-
.../ratis/client/impl/ClientProtoUtils.java | 9 ++-
.../ratis/client/impl/RaftClientImpl.java | 79 ++++++++++++--------
.../client/impl/RaftClientRpcWithProxy.java | 4 +-
.../ratis/protocol/LeaderNotReadyException.java | 4 +-
.../apache/ratis/protocol/RaftClientReply.java | 40 +++++-----
.../java/org/apache/ratis/protocol/RaftId.java | 5 +-
.../java/org/apache/ratis/util/JavaUtils.java | 32 ++++++++
.../java/org/apache/ratis/util/LogUtils.java | 10 ++-
.../ratis/util/UncheckedAutoCloseable.java | 27 +++++++
.../examples/filestore/FileStoreClient.java | 6 +-
.../TestRaftStateMachineException.java | 5 +-
.../org/apache/ratis/grpc/RaftGrpcUtil.java | 12 +--
.../ratis/grpc/client/AppendStreamer.java | 7 +-
.../apache/ratis/grpc/client/GrpcClientRpc.java | 45 +++++------
.../grpc/client/RaftClientProtocolClient.java | 22 ++++--
.../grpc/client/RaftClientProtocolProxy.java | 6 +-
.../ratis/server/impl/PendingRequests.java | 3 +-
.../ratis/server/impl/RaftServerImpl.java | 6 +-
.../apache/ratis/server/impl/RetryCache.java | 6 +-
.../ratis/server/impl/ServerProtoUtils.java | 2 +-
.../apache/ratis/server/impl/ServerState.java | 21 ++++--
.../java/org/apache/ratis/MiniRaftCluster.java | 9 +++
.../org/apache/ratis/RaftExceptionBaseTest.java | 13 ++--
.../impl/RaftReconfigurationBaseTest.java | 2 +-
.../server/simulation/SimulatedClientRpc.java | 2 +-
28 files changed, 242 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index d2c5c1a..89fb8f4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -21,9 +21,8 @@ import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.impl.ClientImplUtils;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.rpc.RpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +82,6 @@ public interface RaftClient extends Closeable {
private RaftGroup group;
private RaftPeerId leaderId;
private RaftProperties properties;
- private TimeDuration retryInterval = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT;
private Parameters parameters;
private Builder() {}
@@ -94,8 +92,6 @@ public interface RaftClient extends Closeable {
clientId = ClientId.randomId();
}
if (properties != null) {
- retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
-
if (clientRpc == null) {
final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(parameters));
@@ -106,7 +102,7 @@ public interface RaftClient extends Closeable {
Objects.requireNonNull(group, "The 'group' field is not initialized."),
leaderId,
Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
- retryInterval, properties);
+ properties);
}
/** Set {@link RaftClient} ID. */
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 51f4430..c6c2d61 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -40,5 +40,5 @@ public interface RaftClientRpc extends Closeable {
void addServers(Iterable<RaftPeer> servers);
/** Handle the given exception. For example, try reconnecting. */
- void handleException(RaftPeerId serverId, Exception e);
+ void handleException(RaftPeerId serverId, Exception e, boolean shouldClose);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index e7c89f3..d813650 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -28,8 +28,7 @@ import org.apache.ratis.protocol.RaftPeerId;
/** Client utilities for internal use. */
public class ClientImplUtils {
public static RaftClient newRaftClient(ClientId clientId, RaftGroup group,
- RaftPeerId leaderId, RaftClientRpc clientRpc, TimeDuration retryInterval,
- RaftProperties properties) {
- return new RaftClientImpl(clientId, group, leaderId, clientRpc, retryInterval, properties);
+ RaftPeerId leaderId, RaftClientRpc clientRpc, RaftProperties properties) {
+ return new RaftClientImpl(clientId, group, leaderId, clientRpc, properties);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 2d7c13e..43c06de 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -109,8 +109,10 @@ public class ClientProtoUtils {
if (reply.getMessage() != null) {
b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
}
- if (reply.isNotLeader()) {
- NotLeaderException nle = reply.getNotLeaderException();
+
+ final NotLeaderException nle = reply.getNotLeaderException();
+ final StateMachineException sme;
+ if (nle != null) {
NotLeaderExceptionProto.Builder nleBuilder =
NotLeaderExceptionProto.newBuilder();
final RaftPeer suggestedLeader = nle.getSuggestedLeader();
@@ -120,8 +122,7 @@ public class ClientProtoUtils {
nleBuilder.addAllPeersInConf(
ProtoUtils.toRaftPeerProtos(Arrays.asList(nle.getPeers())));
b.setNotLeaderException(nleBuilder.build());
- } else if (reply.hasStateMachineException()) {
- StateMachineException sme = reply.getStateMachineException();
+ } else if ((sme = reply.getStateMachineException()) != null) {
StateMachineExceptionProto.Builder smeBuilder =
StateMachineExceptionProto.newBuilder();
final Throwable t = sme.getCause() != null ? sme.getCause() : sme;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 9c66a9f..ba1a107 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -32,6 +32,7 @@ import java.io.InterruptedIOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -56,14 +57,15 @@ final class RaftClientImpl implements RaftClient {
private final Semaphore asyncRequestSemaphore;
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
- RaftClientRpc clientRpc, TimeDuration retryInterval, RaftProperties properties) {
+ RaftClientRpc clientRpc, RaftProperties properties) {
this.clientId = clientId;
this.clientRpc = clientRpc;
this.peers = new ConcurrentLinkedQueue<>(group.getPeers());
this.groupId = group.getGroupId();
this.leaderId = leaderId != null? leaderId
: !peers.isEmpty()? peers.iterator().next().getId(): null;
- this.retryInterval = retryInterval;
+ this.retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
+
asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
clientRpc.addServers(peers);
@@ -101,12 +103,8 @@ final class RaftClientImpl implements RaftClient {
final long seqNum = nextSeqNum();
return sendRequestWithRetryAsync(
() -> new RaftClientRequest(clientId, leaderId, groupId, callId, seqNum, message, readOnly)
- ).thenApply(reply -> {
- if (reply.hasStateMachineException() || reply.hasGroupMismatchException()) {
- throw new CompletionException(reply.getException());
- }
- return reply;
- }).whenComplete((r, e) -> asyncRequestSemaphore.release());
+ ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
+ ).whenComplete((r, e) -> asyncRequestSemaphore.release());
}
@Override
@@ -203,15 +201,12 @@ final class RaftClientImpl implements RaftClient {
private CompletableFuture<RaftClientReply> sendRequestAsync(
RaftClientRequest request) {
- LOG.debug("{}: sendAsync {}", clientId, request);
+ LOG.debug("{}: send* {}", clientId, request);
return clientRpc.sendRequestAsync(request).thenApply(reply -> {
- LOG.debug("{}: receive {}", clientId, reply);
- if (reply != null && reply.isNotLeader()) {
- handleNotLeaderException(request, reply.getNotLeaderException());
- return null;
- }
- return reply;
+ LOG.debug("{}: receive* {}", clientId, reply);
+ return handleNotLeaderException(request, reply);
}).exceptionally(e -> {
+ LOG.debug("{}: Failed {} with {}", clientId, request, e);
final Throwable cause = e.getCause();
if (cause instanceof GroupMismatchException) {
return new RaftClientReply(request, (RaftException) cause);
@@ -233,26 +228,40 @@ final class RaftClientImpl implements RaftClient {
} catch (IOException ioe) {
handleIOException(request, ioe, null);
}
+ LOG.debug("{}: receive {}", clientId, reply);
+ reply = handleNotLeaderException(request, reply);
+ reply = handleStateMachineException(reply, Function.identity());
+ return reply;
+ }
+
+ static <E extends Throwable> RaftClientReply handleStateMachineException(
+ RaftClientReply reply, Function<StateMachineException, E> converter) throws E {
if (reply != null) {
- LOG.debug("{}: receive {}", clientId, reply);
- if (reply.isNotLeader()) {
- handleNotLeaderException(request, reply.getNotLeaderException());
- return null;
- } else if (reply.hasStateMachineException()) {
- throw reply.getStateMachineException();
- } else {
- return reply;
+ final StateMachineException sme = reply.getStateMachineException();
+ if (sme != null) {
+ throw converter.apply(sme);
}
}
- return null;
+ return reply;
}
- private void handleNotLeaderException(RaftClientRequest request,
- NotLeaderException nle) {
+ /**
+ * @return null if the reply is null or it has {@link NotLeaderException};
+ * otherwise return the same reply.
+ */
+ private RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply) {
+ if (reply == null) {
+ return null;
+ }
+ final NotLeaderException nle = reply.getNotLeaderException();
+ if (nle == null) {
+ return reply;
+ }
refreshPeers(Arrays.asList(nle.getPeers()));
final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
: nle.getSuggestedLeader().getId();
handleIOException(request, nle, newLeader);
+ return null;
}
private void refreshPeers(Collection<RaftPeer> newPeers) {
@@ -266,23 +275,29 @@ final class RaftClientImpl implements RaftClient {
private void handleIOException(RaftClientRequest request, IOException ioe,
RaftPeerId newLeader) {
- LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId,
- newLeader, ioe);
+ LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
+ clientId, newLeader, request, ioe);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace", new Throwable("TRACE"));
}
- final RaftPeerId oldLeader = request.getServerId();
- clientRpc.handleException(oldLeader, ioe);
+ if (ioe instanceof LeaderNotReadyException) {
+ return;
+ }
- if (newLeader == null && oldLeader.equals(leaderId)) {
+ final RaftPeerId oldLeader = request.getServerId();
+ final boolean stillLeader = oldLeader.equals(leaderId);
+ if (newLeader == null && stillLeader) {
newLeader = CollectionUtils.random(oldLeader,
CollectionUtils.as(peers, RaftPeer::getId));
}
- if (newLeader != null && oldLeader.equals(leaderId)) {
+
+ final boolean changeLeader = newLeader != null && stillLeader;
+ if (changeLeader) {
LOG.debug("{}: change Leader from {} to {}", clientId, oldLeader, newLeader);
this.leaderId = newLeader;
}
+ clientRpc.handleException(oldLeader, ioe, changeLeader);
}
void assertAsyncRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
index 835a876..6f4f37f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -48,8 +48,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
}
@Override
- public void handleException(RaftPeerId serverId, Exception e) {
- if (ReflectionUtils.isInstance(e,
+ public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) {
+ if (shouldClose || ReflectionUtils.isInstance(e,
SocketException.class, SocketTimeoutException.class,
ClosedChannelException.class, EOFException.class)) {
proxies.resetProxy(serverId);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
index 33f6a4d..55af3f4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -24,8 +24,8 @@ package org.apache.ratis.protocol;
* it cannot determine whether a request is just a retry.
*/
public class LeaderNotReadyException extends RaftException {
- public LeaderNotReadyException() {
- this("The leader is not ready yet");
+ public LeaderNotReadyException(RaftPeerId id) {
+ this(id + " is in LEADER state but not ready yet.");
}
public LeaderNotReadyException(String msg) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index ea59352..77a987d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -17,6 +17,10 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReflectionUtils;
+
/**
* Reply from server to client
*/
@@ -40,6 +44,14 @@ public class RaftClientReply extends RaftClientMessage {
this.callId = callId;
this.message = message;
this.exception = exception;
+
+ if (exception != null) {
+ Preconditions.assertTrue(!success,
+ () -> "Inconsistent parameters: success && exception != null: " + this);
+ Preconditions.assertTrue(
+ ReflectionUtils.isInstance(exception, NotLeaderException.class, StateMachineException.class),
+ () -> "Unexpected exception class: " + this);
+ }
}
public RaftClientReply(RaftClientRequest request,
@@ -64,8 +76,8 @@ public class RaftClientReply extends RaftClientMessage {
@Override
public String toString() {
- return super.toString() + ", callId: " + getCallId()
- + ", success: " + isSuccess();
+ return super.toString() + ", cid=" + getCallId()
+ + ", success? " + isSuccess() + ", exception=" + exception;
}
public boolean isSuccess() {
@@ -76,29 +88,13 @@ public class RaftClientReply extends RaftClientMessage {
return message;
}
- public boolean isNotLeader() {
- return exception instanceof NotLeaderException;
- }
-
+ /** If this reply has {@link NotLeaderException}, return it; otherwise return null. */
public NotLeaderException getNotLeaderException() {
- assert isNotLeader();
- return (NotLeaderException) exception;
+ return JavaUtils.cast(exception, NotLeaderException.class);
}
+ /** If this reply has {@link StateMachineException}, return it; otherwise return null. */
public StateMachineException getStateMachineException() {
- assert hasStateMachineException();
- return (StateMachineException) exception;
- }
-
- public boolean hasStateMachineException() {
- return exception instanceof StateMachineException;
- }
-
- public boolean hasGroupMismatchException(){
- return exception instanceof GroupMismatchException;
- }
-
- public RaftException getException(){
- return exception;
+ return JavaUtils.cast(exception, StateMachineException.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
index 0846856..4b45765 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
@@ -67,8 +67,11 @@ public abstract class RaftId {
this(toUuid(uuidBytes), () -> uuidBytes);
}
+ /** @return the last 12 hex digits. */
String createUuidString(UUID uuid) {
- return uuid.toString().toUpperCase();
+ final String s = uuid.toString().toUpperCase();
+ final int i = s.lastIndexOf('-');
+ return s.substring(i + 1);
}
public ByteString toByteString() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 89407eb..e910f28 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -27,8 +27,12 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Objects;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -40,6 +44,15 @@ public interface JavaUtils {
Logger LOG = LoggerFactory.getLogger(JavaUtils.class);
/**
+ * The same as {@link Class#cast(Object)} except that
+ * this method returns null (but not throw {@link ClassCastException})
+ * if the given object is not an instance of the given class.
+ */
+ static <T> T cast(Object obj, Class<T> clazz) {
+ return clazz.isInstance(obj)? clazz.cast(obj): null;
+ }
+
+ /**
* Invoke {@link Callable#call()} and, if there any,
* wrap the checked exception by {@link RuntimeException}.
*/
@@ -171,6 +184,20 @@ public interface JavaUtils {
}
}
+ Supplier<Timer> TIMER = memoize(() -> new Timer(true));
+
+ static UncheckedAutoCloseable runRepeatedly(Runnable runnable, long delay, long period, TimeUnit unit) {
+ final Timer timer = TIMER.get();
+ timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ runnable.run();
+ }
+ }, unit.toMillis(delay), unit.toMillis(period));
+
+ return timer::cancel;
+ }
+
static void dumpAllThreads(Consumer<String> println) {
final ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
for (ThreadInfo ti : threadMxBean.dumpAllThreads(true, true)) {
@@ -183,4 +210,9 @@ public interface JavaUtils {
future.completeExceptionally(t);
return future;
}
+
+ static Throwable unwrapCompletionException(Throwable t) {
+ Objects.requireNonNull(t, "t == null");
+ return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index 6a4d833..e75b89b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -23,6 +23,7 @@ package org.apache.ratis.util;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -32,9 +33,16 @@ import java.util.function.Supplier;
* Logging (as in log4j) related utility methods.
*/
public interface LogUtils {
+ Logger LOG = LoggerFactory.getLogger(LogUtils.class);
static void setLogLevel(Logger logger, Level level) {
- LogManager.getLogger(logger.getName()).setLevel(level);
+ final String name = logger.getName();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("", new Throwable("Set " + name + " log level to " + level));
+ } else {
+ LOG.info("Set {} log level to {}", name, level);
+ }
+ LogManager.getLogger(name).setLevel(level);
}
static <THROWABLE extends Throwable> void runAndLog(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java b/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java
new file mode 100644
index 0000000..cc73159
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/UncheckedAutoCloseable.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+/**
+ * The same as {@link AutoCloseable}
+ * except that the close method does not throw {@link Exception}.
+ */
+public interface UncheckedAutoCloseable extends AutoCloseable {
+ @Override
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 59d5079..380f315 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -22,6 +22,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.ExamplesProtos.*;
import org.apache.ratis.util.CheckedFunction;
@@ -57,8 +58,9 @@ public class FileStoreClient implements Closeable {
ByteString request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction)
throws IOException {
final RaftClientReply reply = sendFunction.apply(() -> request);
- if (reply.hasStateMachineException()) {
- throw new IOException("Failed to send request " + request, reply.getStateMachineException());
+ final StateMachineException sme = reply.getStateMachineException();
+ if (sme != null) {
+ throw new IOException("Failed to send request " + request, sme);
}
Preconditions.assertTrue(reply.isSuccess(), () -> "reply=" + reply);
return reply.getMessage().getContent();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index 158875d..b0e2b7c 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -35,6 +35,7 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Collection;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.fail;
@@ -146,7 +147,7 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
RaftClientReply reply = rpc.sendRequest(r);
- Assert.assertTrue(reply.hasStateMachineException());
+ Objects.requireNonNull(reply.getStateMachineException());
RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(
cluster.getLeader(), client.getId(), callId);
@@ -155,7 +156,7 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
// retry
reply = rpc.sendRequest(r);
- Assert.assertTrue(reply.hasStateMachineException());
+ Objects.requireNonNull(reply.getStateMachineException());
RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
cluster.getLeader(), client.getId(), callId);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
index d373ddb..5499878 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
@@ -22,10 +22,7 @@ import org.apache.ratis.shaded.io.grpc.Metadata;
import org.apache.ratis.shaded.io.grpc.Status;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.util.CheckedSupplier;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.ReflectionUtils;
-import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.*;
import java.io.IOException;
import java.util.Objects;
@@ -38,12 +35,7 @@ public interface RaftGrpcUtil {
Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
static StatusRuntimeException wrapException(Throwable t) {
- Objects.requireNonNull(t, "t == null");
- if (t instanceof CompletionException) {
- if (t.getCause() != null) {
- t = t.getCause();
- }
- }
+ t = JavaUtils.unwrapCompletionException(t);
Metadata trailers = new Metadata();
trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 9c238f4..810121b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -97,7 +97,7 @@ public class AppendStreamer implements Closeable {
this.peers = group.getPeers().stream().collect(
Collectors.toMap(RaftPeer::getId, Function.identity()));
proxyMap = new PeerProxyMap<>(clientId.toString(),
- raftPeer -> new RaftClientProtocolProxy(raftPeer, ResponseHandler::new));
+ raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new));
proxyMap.addPeers(group.getPeers());
refreshLeaderProxy(leaderId, null);
@@ -277,10 +277,11 @@ public class AppendStreamer implements Closeable {
} else {
// this may be a NotLeaderException
RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
- if (r.isNotLeader()) {
+ final NotLeaderException nle = r.getNotLeaderException();
+ if (nle != null) {
LOG.debug("{} received a NotLeaderException from {}", this,
r.getServerId());
- handleNotLeader(r.getNotLeaderException(), targetId);
+ handleNotLeader(nle, targetId);
}
}
AppendStreamer.this.notifyAll();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 2b7de70..c5c188e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -23,13 +23,13 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.RaftGrpcUtil;
import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.apache.ratis.shaded.proto.RaftProtos;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.PeerProxyMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,10 +43,13 @@ import static org.apache.ratis.client.impl.ClientProtoUtils.*;
public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
+
+ private final ClientId clientId;
private final int maxMessageSize;
public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
- super(new PeerProxyMap<>(clientId.toString(), RaftClientProtocolClient::new));
+ super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p)));
+ this.clientId = clientId;
maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt();
}
@@ -57,10 +60,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
try {
return sendRequestAsync(request, getProxies().getProxy(serverId));
} catch (IOException e) {
- final CompletableFuture<RaftClientReply> replyFuture =
- new CompletableFuture<>();
- replyFuture.completeExceptionally(e);
- return replyFuture;
+ return JavaUtils.completeExceptionally(e);
}
}
@@ -83,16 +83,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
return ClientProtoUtils.toServerInformationReply(
proxy.serverInformation(proto));
} else {
- RaftClientRequestProto requestProto = toRaftClientRequestProto(request);
- if (requestProto.getSerializedSize() > maxMessageSize) {
- throw new IOException("msg size:" + requestProto.getSerializedSize() +
- " exceeds maximum:" + maxMessageSize);
- }
- final CompletableFuture<RaftClientReply> replyFuture =
- sendRequestAsync(request, proxy);
+ final CompletableFuture<RaftClientReply> f = sendRequestAsync(request, proxy);
// TODO: timeout support
try {
- return replyFuture.get();
+ return f.get();
} catch (InterruptedException e) {
throw new InterruptedIOException(
"Interrupted while waiting for response of request " + request);
@@ -103,7 +97,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
}
private CompletableFuture<RaftClientReply> sendRequestAsync(
- RaftClientRequest request, RaftClientProtocolClient proxy) {
+ RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException {
final RaftClientRequestProto requestProto =
toRaftClientRequestProto(request);
final CompletableFuture<RaftClientReplyProto> replyFuture =
@@ -117,22 +111,14 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
@Override
public void onError(Throwable t) {
- // This implementation is used as RaftClientRpc. Retry
- // logic on Exception is in RaftClient.
- final IOException e;
- if (t instanceof StatusRuntimeException) {
- e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
- } else {
- e = IOUtils.asIOException(t);
- }
- replyFuture.completeExceptionally(e);
+ replyFuture.completeExceptionally(RaftGrpcUtil.unwrapIOException(t));
}
@Override
public void onCompleted() {
if (!replyFuture.isDone()) {
replyFuture.completeExceptionally(
- new IOException("No reply for request " + request));
+ new IOException(clientId + ": Stream completed but no reply for request " + request));
}
}
});
@@ -141,4 +127,13 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
return replyFuture.thenApply(replyProto -> toRaftClientReply(replyProto));
}
+
+ RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) throws IOException {
+ final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
+ if (proto.getSerializedSize() > maxMessageSize) {
+ throw new IOException(clientId + ": Message size:" + proto.getSerializedSize()
+ + " exceeds maximum:" + maxMessageSize);
+ }
+ return proto;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index 12a3a5b..ace90f2 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -18,35 +18,39 @@
package org.apache.ratis.grpc.client;
import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.shaded.io.grpc.ManagedChannel;
import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
import org.apache.ratis.util.CheckedSupplier;
+import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
+import java.util.function.Supplier;
public class RaftClientProtocolClient implements Closeable {
+ public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolClient.class);
+
+ private final Supplier<String> name;
private final RaftPeer target;
private final ManagedChannel channel;
private final RaftClientProtocolServiceBlockingStub blockingStub;
private final RaftClientProtocolServiceStub asyncStub;
private final AdminProtocolServiceBlockingStub adminBlockingStub;
- public RaftClientProtocolClient(RaftPeer target) {
+ public RaftClientProtocolClient(ClientId id, RaftPeer target) {
+ this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
this.target = target;
channel = ManagedChannelBuilder.forTarget(target.getAddress())
.usePlaintext(true).build();
@@ -55,6 +59,10 @@ public class RaftClientProtocolClient implements Closeable {
adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
}
+ String getName() {
+ return name.get();
+ }
+
@Override
public void close() {
channel.shutdownNow();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
index 6892c71..297fe26 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.grpc.client;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
@@ -31,9 +32,10 @@ public class RaftClientProtocolProxy implements Closeable {
private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
private RpcSession currentSession;
- public RaftClientProtocolProxy(RaftPeer target,
+ public RaftClientProtocolProxy(
+ ClientId clientId, RaftPeer target,
Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) {
- proxy = new RaftClientProtocolClient(target);
+ proxy = new RaftClientProtocolClient(clientId, target);
this.responseHandlerCreation = responseHandlerCreation;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index f651230..1dead9d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
@@ -29,7 +30,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
class PendingRequests {
- private static final Logger LOG = RaftServerImpl.LOG;
+ public static final Logger LOG = LoggerFactory.getLogger(PendingRequests.class);
private static class RequestMap {
private final Object name;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 122ff51..bc45b5f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -367,7 +367,7 @@ public class RaftServerImpl implements RaftServerProtocol,
return RetryCache.failWithReply(reply, entry);
} else {
if (leaderState == null || !leaderState.isReady()) {
- return RetryCache.failWithException(new LeaderNotReadyException(), entry);
+ return RetryCache.failWithException(new LeaderNotReadyException(getId()), entry);
}
}
return null;
@@ -732,8 +732,8 @@ public class RaftServerImpl implements RaftServerProtocol,
final AppendEntriesReplyProto reply = ServerProtoUtils.toAppendEntriesReplyProto(
leaderId, getId(), groupId, currentTerm, nextIndex, NOT_LEADER);
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: do not recognize leader. Reply: {}",
- getId(), ProtoUtils.toString(reply));
+ LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
+ getId(), leaderId, leaderTerm, state, ProtoUtils.toString(reply));
}
return reply;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
index afe2a7e..5abfc1a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.shaded.com.google.common.cache.Cache;
import org.apache.ratis.shaded.com.google.common.cache.CacheBuilder;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -243,10 +244,7 @@ public class RetryCache implements Closeable {
entry.failWithException(t);
return entry.getReplyFuture();
} else {
- final CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
- future.completeExceptionally(t);
- return future;
+ return JavaUtils.completeExceptionally(t);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 9f84e05..3e44c76 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -64,7 +64,7 @@ public class ServerProtoUtils {
final ByteString clientId = entry.getClientId();
return toTermIndexString(entry) + entry.getLogEntryBodyCase()
+ ", " + (clientId.isEmpty()? "<empty clientId>": ClientId.valueOf(clientId))
- + ", callId=" + entry.getCallId();
+ + ", cid=" + entry.getCallId();
}
public static String toString(LogEntryProto... entries) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e83a931..a51d4b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -35,8 +35,10 @@ import org.apache.ratis.util.ProtoUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.util.Objects;
import java.util.function.Consumer;
+import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
/**
@@ -169,7 +171,7 @@ public class ServerState implements Closeable {
if (newTerm > currentTerm) {
currentTerm = newTerm;
votedFor = null;
- leaderId = null;
+ setLeader(null);
return true;
}
return false;
@@ -188,7 +190,7 @@ public class ServerState implements Closeable {
*/
long initElection() {
votedFor = selfId;
- leaderId = null;
+ setLeader(null);
return ++currentTerm;
}
@@ -201,15 +203,18 @@ public class ServerState implements Closeable {
*/
void grantVote(RaftPeerId candidateId) {
votedFor = candidateId;
- leaderId = null;
+ setLeader(null);
}
- void setLeader(RaftPeerId leaderId) {
- this.leaderId = leaderId;
+ void setLeader(RaftPeerId newLeaderId) {
+ if (!Objects.equals(leaderId, newLeaderId)) {
+ LOG.info("{}: change Leader from {} to {}", selfId, leaderId, newLeaderId);
+ leaderId = newLeaderId;
+ }
}
void becomeLeader() {
- leaderId = selfId;
+ setLeader(selfId);
}
public RaftLog getLog() {
@@ -280,7 +285,7 @@ public class ServerState implements Closeable {
public void setRaftConf(long logIndex, RaftConfiguration conf) {
configurationManager.addConfiguration(logIndex, conf);
- RaftServerImpl.LOG.info("{}: successfully update the configuration {}",
+ LOG.info("{}: successfully update the configuration {}",
getSelfId(), conf);
}
@@ -313,7 +318,7 @@ public class ServerState implements Closeable {
@Override
public void close() throws IOException {
stateMachineUpdater.stop();
- RaftServerImpl.LOG.info("{} closes. The last applied log index is {}",
+ LOG.info("{} closes. The last applied log index is {}",
getSelfId(), getLastAppliedIndex());
log.close();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 88fddf4..b50d847 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -145,6 +145,7 @@ public abstract class MiniRaftCluster {
this.properties = new RaftProperties(properties);
this.parameters = parameters;
+ JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT" + printServers()), 10, 10, TimeUnit.SECONDS);
ExitUtils.disableSystemExit();
}
@@ -470,6 +471,14 @@ public abstract class MiniRaftCluster {
return createClient(null, g);
}
+ public RaftClient createClientWithLeader() {
+ return createClient(getLeader().getId(), group);
+ }
+
+ public RaftClient createClientWithFollower() {
+ return createClient(getFollowers().get(0).getId(), group);
+ }
+
public RaftClient createClient(RaftPeerId leaderId) {
return createClient(leaderId, group);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 4672b9d..7ad5bd2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Objects;
public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -103,9 +104,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
}
Assert.assertNotNull(reply);
Assert.assertFalse(reply.isSuccess());
- Assert.assertTrue(reply.isNotLeader());
- Assert.assertEquals(newLeader,
- reply.getNotLeaderException().getSuggestedLeader().getId());
+ final NotLeaderException nle = reply.getNotLeaderException();
+ Objects.requireNonNull(nle);
+ Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId());
reply = client.send(new SimpleMessage("m3"));
Assert.assertTrue(reply.isSuccess());
@@ -148,9 +149,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
}
Assert.assertNotNull(reply);
Assert.assertFalse(reply.isSuccess());
- Assert.assertTrue(reply.isNotLeader());
- Assert.assertEquals(newLeader,
- reply.getNotLeaderException().getSuggestedLeader().getId());
+ final NotLeaderException nle = reply.getNotLeaderException();
+ Objects.requireNonNull(nle);
+ Assert.assertEquals(newLeader, nle.getSuggestedLeader().getId());
Collection<RaftPeer> peers = cluster.getPeers();
RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers();
Assert.assertEquals(peers.size(), peersFromReply.length);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 1fccfc4..9b142fd 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -532,7 +532,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
final RaftClientRpc sender = client.getClientRpc();
RaftClientReply reply = sender.sendRequest(cluster.newSetConfigurationRequest(
client.getId(), leaderId, change.allPeersInNewConf));
- if (reply.isNotLeader()) {
+ if (reply.getNotLeaderException() != null) {
gotNotLeader.set(true);
}
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8f0a3ed5/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index 386d2e7..a141789 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -36,7 +36,7 @@ class SimulatedClientRpc
}
@Override
- public void handleException(RaftPeerId serverId, Exception e) {
+ public void handleException(RaftPeerId serverId, Exception e, boolean shouldClose) {
// do nothing
}