You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/01/19 10:21:25 UTC
incubator-ratis git commit: RATIS-192. In gRPC,
appendEntries replies were received out of order on the Leader.
Contributed by Mukul Kumar Singh
Repository: incubator-ratis
Updated Branches:
refs/heads/master 5b4bc0e7d -> 7b3a9a6f5
RATIS-192. In gRPC, appendEntries replies were received out of order on the Leader. Contributed by Mukul Kumar Singh
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7b3a9a6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7b3a9a6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7b3a9a6f
Branch: refs/heads/master
Commit: 7b3a9a6f5f8e8075727d84e3ddeae7b594eda89c
Parents: 5b4bc0e
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Jan 19 18:20:56 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Jan 19 18:20:56 2018 +0800
----------------------------------------------------------------------
.../ratis/grpc/server/RaftServerProtocolService.java | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7b3a9a6f/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
index a7a6990..1e499ae 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
@@ -27,6 +27,8 @@ import org.apache.ratis.util.ProtoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase {
@@ -62,16 +64,27 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
public StreamObserver<AppendEntriesRequestProto> appendEntries(
StreamObserver<AppendEntriesReplyProto> responseObserver) {
return new StreamObserver<AppendEntriesRequestProto>() {
+ private final AtomicReference<CompletableFuture<Void>> previousOnNext =
+ new AtomicReference<>(CompletableFuture.completedFuture(null));
+
@Override
public void onNext(AppendEntriesRequestProto request) {
+ final CompletableFuture<Void> current = new CompletableFuture<>();
+ final CompletableFuture<Void> previous = previousOnNext.getAndSet(current);
try {
- server.appendEntriesAsync(request).thenAccept(responseObserver::onNext);
+ server.appendEntriesAsync(request).thenCombine(previous,
+ (reply, v) -> {
+ responseObserver.onNext(reply);
+ current.complete(null);
+ return null;
+ });
} catch (Throwable e) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} got exception when appendEntries {}: {}",
getId(), ProtoUtils.toString(request.getServerRequest()), e);
}
responseObserver.onError(RaftGrpcUtil.wrapException(e));
+ current.completeExceptionally(e);
}
}