You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/19 07:17:51 UTC
[incubator-ratis] branch master updated: RATIS-499. Leader should
close the sliding window if it steps down.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 74b491c RATIS-499. Leader should close the sliding window if it steps down.
74b491c is described below
commit 74b491cc76aa5e26029c3f6d77c1210809ecc110
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 19 15:17:18 2019 +0800
RATIS-499. Leader should close the sliding window if it steps down.
---
.../ratis/client/impl/RaftClientTestUtil.java | 11 +-
.../apache/ratis/protocol/RaftClientRequest.java | 2 +-
.../org/apache/ratis/util/CollectionUtils.java | 41 ++++---
.../grpc/client/GrpcClientProtocolClient.java | 22 ++--
.../ratis/grpc/client/GrpcClientProtocolProxy.java | 2 +-
.../grpc/client/GrpcClientProtocolService.java | 81 ++++++++++---
.../apache/ratis/grpc/client/GrpcClientRpc.java | 12 +-
.../org/apache/ratis/grpc/server/GrpcService.java | 23 ++--
ratis-proto/src/main/proto/Grpc.proto | 8 +-
.../org/apache/ratis/server/RaftServerRpc.java | 6 +-
.../org/apache/ratis/server/impl/LeaderState.java | 1 +
.../org/apache/ratis/server/impl/RoleInfo.java | 4 +
.../ratis/server/impl/RaftServerTestUtil.java | 7 +-
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 135 +++++++++++++++++++--
14 files changed, 277 insertions(+), 78 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 20647de..7426d32 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,10 @@
package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
/** Interface for testing raft client. */
public interface RaftClientTestUtil {
@@ -33,4 +37,9 @@ public interface RaftClientTestUtil {
static long getCallId(RaftClient client) {
return ((RaftClientImpl) client).getCallId();
}
+
+ static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId server,
+ long callId, Message message, RaftClientRequest.Type type, SlidingWindowEntry slidingWindowEntry) {
+ return ((RaftClientImpl)client).newRaftClientRequest(server, callId, message, type, slidingWindowEntry);
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index a007e83..18253fa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -199,7 +199,7 @@ public class RaftClientRequest extends RaftClientMessage {
@Override
public String toString() {
- return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry)
+ return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
+ type + ", " + getMessage();
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index a215d3d..39d9908 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -1,23 +1,20 @@
/*
- * *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.ratis.util;
import java.util.*;
@@ -91,16 +88,22 @@ public interface CollectionUtils {
return as(Arrays.asList(array), converter);
}
- static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<String> name) {
+ static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<Object> name) {
final V returned = map.put(key, value);
Preconditions.assertTrue(returned == null,
() -> "Entry already exists for key " + key + " in map " + name.get());
return value;
}
- static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> map, Supplier<String> name) {
+ static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> map, Supplier<Object> name) {
final boolean replaced = map.replace(key, oldValue, newValue);
Preconditions.assertTrue(replaced,
() -> "Entry not found for key " + key + " in map " + name.get());
}
+
+ static <K, V> void removeExisting(K key, V value, Map<K, V> map, Supplier<Object> name) {
+ final boolean removed = map.remove(key, value);
+ Preconditions.assertTrue(removed,
+ () -> "Entry not found for key " + key + " in map " + name.get());
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 0be963f..2ed5df0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -84,7 +84,7 @@ public class GrpcClientProtocolClient implements Closeable {
private final RaftClientProtocolServiceStub asyncStub;
private final AdminProtocolServiceBlockingStub adminBlockingStub;
- private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
+ private final AtomicReference<AsyncStreamObservers> orderedStreamObservers = new AtomicReference<>();
private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
@@ -128,7 +128,7 @@ public class GrpcClientProtocolClient implements Closeable {
@Override
public void close() {
- Optional.ofNullable(appendStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
+ Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
channel.shutdownNow();
}
@@ -169,20 +169,18 @@ public class GrpcClientProtocolClient implements Closeable {
}
}
- StreamObserver<RaftClientRequestProto> append(
- StreamObserver<RaftClientReplyProto> responseHandler) {
- return asyncStub.append(responseHandler);
+ StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseHandler) {
+ return asyncStub.ordered(responseHandler);
}
- StreamObserver<RaftClientRequestProto> appendWithTimeout(
- StreamObserver<RaftClientReplyProto> responseHandler) {
+ StreamObserver<RaftClientRequestProto> orderedWithTimeout(StreamObserver<RaftClientReplyProto> responseHandler) {
return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
- .append(responseHandler);
+ .unordered(responseHandler);
}
- AsyncStreamObservers getAppendStreamObservers() {
- return appendStreamObservers.updateAndGet(
- a -> a != null? a : new AsyncStreamObservers(appendStreamObservers, this::append));
+ AsyncStreamObservers getOrderedStreamObservers() {
+ return orderedStreamObservers.updateAndGet(
+ a -> a != null? a : new AsyncStreamObservers(orderedStreamObservers, this::ordered));
}
AsyncStreamObservers getUnorderedAsyncStreamObservers() {
@@ -329,7 +327,7 @@ public class GrpcClientProtocolClient implements Closeable {
final CompletableFuture<RaftClientReply> f = entry.getValue();
if (!f.isDone()) {
f.completeExceptionally(t != null? t
- : new IOException(getName() + ": Stream " + event
+ : new AlreadyClosedException(getName() + ": Stream " + event
+ ": no reply for async request cid=" + entry.getKey()));
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
index 3a3941b..666b1c3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
@@ -84,7 +84,7 @@ public class GrpcClientProtocolProxy implements Closeable {
RpcSession(CloseableStreamObserver responseHandler) {
this.responseHandler = responseHandler;
- this.requestObserver = proxy.append(responseHandler);
+ this.requestObserver = proxy.ordered(responseHandler);
}
void onError() {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 2883795..b0a7578 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
+import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SlidingWindow;
@@ -36,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -44,11 +46,11 @@ import java.util.function.Supplier;
public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);
- private static class PendingAppend implements SlidingWindow.ServerSideRequest<RaftClientReply> {
+ private static class PendingOrderedRequest implements SlidingWindow.ServerSideRequest<RaftClientReply> {
private final RaftClientRequest request;
private volatile RaftClientReply reply;
- PendingAppend(RaftClientRequest request) {
+ PendingOrderedRequest(RaftClientRequest request) {
this.request = request;
}
@@ -85,11 +87,34 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
return request != null? getSeqNum() + ":" + reply: "COMPLETED";
}
}
- private static final PendingAppend COMPLETED = new PendingAppend(null);
+ private static final PendingOrderedRequest COMPLETED = new PendingOrderedRequest(null);
+
+ static class OrderedStreamObservers {
+ private final Map<Integer, OrderedRequestStreamObserver> map = new ConcurrentHashMap<>();
+
+ void putNew(OrderedRequestStreamObserver so) {
+ CollectionUtils.putNew(so.getId(), so, map, () -> getClass().getSimpleName());
+ }
+
+ void removeExisting(OrderedRequestStreamObserver so) {
+ CollectionUtils.removeExisting(so.getId(), so, map, () -> getClass().getSimpleName());
+ }
+
+ void closeAllExisting() {
+ // Iteration not synchronized:
+ // Okay if an existing object is removed by another mean during the iteration since it must be already closed.
+ // Also okay if a new object is added during the iteration since this method closes only the existing objects.
+ for(OrderedRequestStreamObserver so : map.values()) {
+ so.close();
+ }
+ }
+ }
private final Supplier<RaftPeerId> idSupplier;
private final RaftClientAsynchronousProtocol protocol;
+ private final OrderedStreamObservers orderedStreamObservers = new OrderedStreamObservers();
+
public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) {
this.idSupplier = idSupplier;
this.protocol = protocol;
@@ -108,9 +133,15 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
}
@Override
- public StreamObserver<RaftClientRequestProto> append(
- StreamObserver<RaftClientReplyProto> responseObserver) {
- return new AppendRequestStreamObserver(responseObserver);
+ public StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseObserver) {
+ final OrderedRequestStreamObserver so = new OrderedRequestStreamObserver(responseObserver);
+ orderedStreamObservers.putNew(so);
+ return so;
+ }
+
+ public void closeAllOrderedRequestStreamObservers() {
+ LOG.debug("{}: closeAllOrderedRequestStreamObservers", getId());
+ orderedStreamObservers.closeAllExisting();
}
@Override
@@ -121,7 +152,8 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
private final AtomicInteger streamCount = new AtomicInteger();
private abstract class RequestStreamObserver implements StreamObserver<RaftClientRequestProto> {
- private final String name = getId() + "-" + getClass().getSimpleName() + streamCount.getAndIncrement();
+ private final int id = streamCount.getAndIncrement();
+ private final String name = getId() + "-" + getClass().getSimpleName() + id;
private final StreamObserver<RaftClientReplyProto> responseObserver;
private final AtomicBoolean isClosed = new AtomicBoolean();
@@ -130,6 +162,10 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
this.responseObserver = responseObserver;
}
+ int getId() {
+ return id;
+ }
+
String getName() {
return name;
}
@@ -139,11 +175,25 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
}
synchronized void responseCompleted() {
- responseObserver.onCompleted();
+ try {
+ responseObserver.onCompleted();
+ } catch(Exception e) {
+ // response stream may possibly be already closed/failed so that the exception can be safely ignored.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getName() + ": Failed onCompleted, exception is ignored", e);
+ }
+ }
}
synchronized void responseError(Throwable t) {
- responseObserver.onError(t);
+ try {
+ responseObserver.onError(t);
+ } catch(Exception e) {
+ // response stream may possibly be already closed/failed so that the exception can be safely ignored.
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getName() + ": Failed onError, exception is ignored", e);
+ }
+ }
}
@@ -243,15 +293,15 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
}
}
- private class AppendRequestStreamObserver extends RequestStreamObserver {
- private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+ private class OrderedRequestStreamObserver extends RequestStreamObserver {
+ private final SlidingWindow.Server<PendingOrderedRequest, RaftClientReply> slidingWindow
= new SlidingWindow.Server<>(getName(), COMPLETED);
- AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+ OrderedRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
super(responseObserver);
}
- void processClientRequest(PendingAppend pending) {
+ void processClientRequest(PendingOrderedRequest pending) {
final long seq = pending.getSeqNum();
processClientRequest(pending.getRequest(),
reply -> slidingWindow.receiveReply(seq, reply, this::sendReply, this::processClientRequest));
@@ -259,10 +309,10 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
@Override
void processClientRequest(RaftClientRequest r) {
- slidingWindow.receivedRequest(new PendingAppend(r), this::processClientRequest);
+ slidingWindow.receivedRequest(new PendingOrderedRequest(r), this::processClientRequest);
}
- private void sendReply(PendingAppend ready) {
+ private void sendReply(PendingOrderedRequest ready) {
Preconditions.assertTrue(ready.hasReply());
if (ready == COMPLETED) {
close();
@@ -291,6 +341,7 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
LOG.debug("{}: close", getName());
responseCompleted();
slidingWindow.close();
+ orderedStreamObservers.removeExisting(this);
}
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index a63c2af..deaaac0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -65,7 +65,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
try {
final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
// Reuse the same grpc stream for all async calls.
- return proxy.getAppendStreamObservers().onNext(request);
+ return proxy.getOrderedStreamObservers().onNext(request);
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
@@ -113,6 +113,9 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
throw new InterruptedIOException(
"Interrupted while waiting for response of request " + request);
} catch (ExecutionException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(clientId + ": failed " + request, e);
+ }
throw IOUtils.toIOException(e);
}
}
@@ -122,11 +125,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
RaftClientRequest request, GrpcClientProtocolClient proxy) throws IOException {
final RaftClientRequestProto requestProto =
toRaftClientRequestProto(request);
- final CompletableFuture<RaftClientReplyProto> replyFuture =
- new CompletableFuture<>();
+ final CompletableFuture<RaftClientReplyProto> replyFuture = new CompletableFuture<>();
// create a new grpc stream for each non-async call.
final StreamObserver<RaftClientRequestProto> requestObserver =
- proxy.appendWithTimeout(new StreamObserver<RaftClientReplyProto>() {
+ proxy.orderedWithTimeout(new StreamObserver<RaftClientReplyProto>() {
@Override
public void onNext(RaftClientReplyProto value) {
replyFuture.complete(value);
@@ -141,7 +143,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
public void onCompleted() {
if (!replyFuture.isDone()) {
replyFuture.completeExceptionally(
- new IOException(clientId + ": Stream completed but no reply for request " + request));
+ new AlreadyClosedException(clientId + ": Stream completed but no reply for request " + request));
}
}
});
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 5139aff..de65acc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -81,6 +81,8 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
private final Server server;
private final Supplier<InetSocketAddress> addressSupplier;
+ private final GrpcClientProtocolService clientProtocolService;
+
private GrpcService(RaftServer server, GrpcTlsConfig tlsConfig) {
this(server, server::getId,
GrpcConfigKeys.Server.port(server.getProperties()),
@@ -103,11 +105,13 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
+ " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
}
+ this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer);
+
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
.maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
.flowControlWindow(flowControlWindow.getSizeInt())
.addService(new GrpcServerProtocolService(idSupplier, raftServer))
- .addService(new GrpcClientProtocolService(idSupplier, raftServer))
+ .addService(clientProtocolService)
.addService(new GrpcAdminProtocolService(raftServer));
if (tlsConfig != null) {
@@ -159,22 +163,25 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
}
@Override
+ public void notifyNotLeader() {
+ clientProtocolService.closeAllOrderedRequestStreamObservers();
+ }
+
+ @Override
public InetSocketAddress getInetSocketAddress() {
return addressSupplier.get();
}
@Override
- public AppendEntriesReplyProto appendEntries(
- AppendEntriesRequestProto request) throws IOException {
+ public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) {
throw new UnsupportedOperationException(
- "Blocking AppendEntries call is not supported");
+ "Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
}
@Override
- public InstallSnapshotReplyProto installSnapshot(
- InstallSnapshotRequestProto request) throws IOException {
+ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) {
throw new UnsupportedOperationException(
- "Blocking InstallSnapshot call is not supported");
+ "Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
}
@Override
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index f177609..497e3fd 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,11 +28,11 @@ service RaftClientProtocolService {
rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
returns(ratis.common.RaftClientReplyProto) {}
- // A client-to-server stream RPC to append data
- rpc append(stream ratis.common.RaftClientRequestProto)
+ // A client-to-server stream RPC to ordered async requests
+ rpc ordered(stream ratis.common.RaftClientRequestProto)
returns (stream ratis.common.RaftClientReplyProto) {}
- // A client-to-server stream RPC for unordered async requested
+ // A client-to-server stream RPC for unordered async requests
rpc unordered(stream ratis.common.RaftClientRequestProto)
returns (stream ratis.common.RaftClientReplyProto) {}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index a85e606..c37433e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -62,4 +62,8 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl
/** Handle the given exception. For example, try reconnecting. */
void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
+
+ /** The server role changes from leader to a non-leader role. */
+ default void notifyNotLeader() {
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 8ece134..e828fd8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -246,6 +246,7 @@ public class LeaderState {
} catch (IOException e) {
LOG.warn(server.getId() + ": Caught exception in sendNotLeaderResponses", e);
}
+ server.getServerRpc().notifyNotLeader();
}
void notifySenders() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 88d525c..52f4b81 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -53,6 +53,10 @@ class RoleInfo {
this.transitionTime = new AtomicReference<>(Timestamp.currentTime());
}
+ RaftPeerRole getRaftPeerRole() {
+ return role;
+ }
+
void transitionRole(RaftPeerRole newRole) {
this.role = newRole;
this.transitionTime.set(Timestamp.currentTime());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index ea9b9a2..676713f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.log4j.Level;
import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@@ -93,6 +94,10 @@ public class RaftServerTestUtil {
return entry.isFailed();
}
+ public static RaftPeerRole getRole(RaftServerImpl server) {
+ return server.getRole().getRaftPeerRole();
+ }
+
public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
return server.getRole().getLeaderState().map(LeaderState::getLogAppenders).orElse(null);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 2ec7ae8..177e0f7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,32 +17,74 @@
*/
package org.apache.ratis.grpc;
+import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.TimeoutIOException;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.channels.OverlappingFileLockException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
-public class TestRaftServerWithGrpc extends BaseTest {
+public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
+ {
+ LogUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
+ LogUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL);
+ }
+
+ @Before
+ public void setup() {
+ final RaftProperties p = getProperties();
+ p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
+ RaftClientConfigKeys.Rpc.setRequestTimeout(p, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+ }
@Test
public void testServerRestartOnException() throws Exception {
- RaftProperties properties = new RaftProperties();
- final MiniRaftClusterWithGrpc cluster
- = MiniRaftClusterWithGrpc.FACTORY.newCluster(1, properties);
- cluster.start();
- RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
- GrpcConfigKeys.Server.setPort(properties, cluster.getLeader().getServerRpc().getInetSocketAddress().getPort());
+ runWithNewCluster(1, this::runTestServerRestartOnException);
+ }
+
+ void runTestServerRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exception {
+ final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = leader.getId();
+
+ final RaftProperties p = getProperties();
+ GrpcConfigKeys.Server.setPort(p, leader.getServerRpc().getInetSocketAddress().getPort());
+
// Create a raft server proxy with server rpc bound to a different address
// compared to leader. This helps in locking the raft storage directory to
// be used by next raft server proxy instance.
final StateMachine stateMachine = cluster.getLeader().getStateMachine();
- ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, properties, null);
+ ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null);
// Close the server rpc for leader so that new raft server can be bound to it.
cluster.getLeader().getServerRpc().close();
@@ -51,9 +93,82 @@ public class TestRaftServerWithGrpc extends BaseTest {
// the raft server proxy created earlier. Raft server proxy should close
// the rpc server on failure.
testFailureCase("start a new server with the same address",
- () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, properties, null).start(),
+ () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null).start(),
IOException.class, OverlappingFileLockException.class);
// Try to start a raft server rpc at the leader address.
cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
}
+
+ @Test
+ public void testUnsupportedMethods() throws Exception {
+ runWithNewCluster(1, this::runTestUnsupportedMethods);
+ }
+
+ void runTestUnsupportedMethods(MiniRaftClusterWithGrpc cluster) throws Exception {
+ final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+ final RaftServerRpc rpc = cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
+
+ testFailureCase("appendEntries",
+ () -> rpc.appendEntries(null),
+ UnsupportedOperationException.class);
+
+ testFailureCase("installSnapshot",
+ () -> rpc.installSnapshot(null),
+ UnsupportedOperationException.class);
+ }
+
+ @Test
+ public void testLeaderRestart() throws Exception {
+ runWithNewCluster(3, this::runTestLeaderRestart);
+ }
+
+ void runTestLeaderRestart(MiniRaftClusterWithGrpc cluster) throws Exception {
+ final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+ try (final RaftClient client = cluster.createClient()) {
+ // send a request to make sure leader is ready
+ final CompletableFuture<RaftClientReply> f = client.sendAsync(new SimpleMessage("testing"));
+ Assert.assertTrue(f.get().isSuccess());
+ }
+
+ try (final RaftClient client = cluster.createClient()) {
+ final RaftClientRpc rpc = client.getClientRpc();
+
+ final AtomicLong seqNum = new AtomicLong();
+ {
+ // send a request using rpc directly
+ final RaftClientRequest request = newRaftClientRequest(client, leader.getId(), seqNum.incrementAndGet());
+ final CompletableFuture<RaftClientReply> f = rpc.sendRequestAsync(request);
+ Assert.assertTrue(f.get().isSuccess());
+ }
+
+ // send another request which will be blocked
+ final SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get(leader);
+ stateMachine.blockStartTransaction();
+ final RaftClientRequest requestBlocked = newRaftClientRequest(client, leader.getId(), seqNum.incrementAndGet());
+ final CompletableFuture<RaftClientReply> futureBlocked = rpc.sendRequestAsync(requestBlocked);
+
+ // change leader
+ RaftTestUtil.changeLeader(cluster, leader.getId());
+ Assert.assertNotEquals(RaftPeerRole.LEADER, RaftServerTestUtil.getRole(leader));
+
+ // the blocked request should fail
+ testFailureCase("request should fail", futureBlocked::get,
+ ExecutionException.class, AlreadyClosedException.class);
+ stateMachine.unblockStartTransaction();
+
+ // send one more request which should timeout.
+ final RaftClientRequest requestTimeout = newRaftClientRequest(client, leader.getId(), seqNum.incrementAndGet());
+ final CompletableFuture<RaftClientReply> f = rpc.sendRequestAsync(requestTimeout);
+ testFailureCase("request should timeout", f::get,
+ ExecutionException.class, TimeoutIOException.class);
+ }
+
+ }
+
+ static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) {
+ final SimpleMessage m = new SimpleMessage("m" + seqNum);
+ return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m,
+ RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
+ }
}