You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/01/19 10:21:25 UTC

incubator-ratis git commit: RATIS-192. In gRPC, appendEntries replies were received out of order on the Leader. Contributed by Mukul Kumar Singh

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 5b4bc0e7d -> 7b3a9a6f5


RATIS-192. In gRPC, appendEntries replies were received out of order on the Leader.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7b3a9a6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7b3a9a6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7b3a9a6f

Branch: refs/heads/master
Commit: 7b3a9a6f5f8e8075727d84e3ddeae7b594eda89c
Parents: 5b4bc0e
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Jan 19 18:20:56 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Jan 19 18:20:56 2018 +0800

----------------------------------------------------------------------
 .../ratis/grpc/server/RaftServerProtocolService.java | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7b3a9a6f/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
index a7a6990..1e499ae 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
@@ -27,6 +27,8 @@ import org.apache.ratis.util.ProtoUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase {
@@ -62,16 +64,27 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase
   public StreamObserver<AppendEntriesRequestProto> appendEntries(
       StreamObserver<AppendEntriesReplyProto> responseObserver) {
     return new StreamObserver<AppendEntriesRequestProto>() {
+      private final AtomicReference<CompletableFuture<Void>> previousOnNext =
+          new AtomicReference<>(CompletableFuture.completedFuture(null));
+
       @Override
       public void onNext(AppendEntriesRequestProto request) {
+        final CompletableFuture<Void> current = new CompletableFuture<>();
+        final CompletableFuture<Void> previous = previousOnNext.getAndSet(current);
         try {
-          server.appendEntriesAsync(request).thenAccept(responseObserver::onNext);
+          server.appendEntriesAsync(request).thenCombine(previous,
+              (reply, v) -> {
+            responseObserver.onNext(reply);
+            current.complete(null);
+            return null;
+          });
         } catch (Throwable e) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("{} got exception when appendEntries {}: {}",
                 getId(), ProtoUtils.toString(request.getServerRequest()), e);
           }
           responseObserver.onError(RaftGrpcUtil.wrapException(e));
+          current.completeExceptionally(e);
         }
       }