You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/01/12 01:17:52 UTC

incubator-ratis git commit: RATIS-189. AppendRequestStreamObserver close can be called twice. Contributed by Mukul Kumar Singh

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 1f4b5727b -> 8cac9d54f


RATIS-189. AppendRequestStreamObserver close can be called twice.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8cac9d54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8cac9d54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8cac9d54

Branch: refs/heads/master
Commit: 8cac9d54f1e3e713afe41c170da59a41efd81334
Parents: 1f4b572
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Jan 12 09:16:54 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Jan 12 09:16:54 2018 +0800

----------------------------------------------------------------------
 .../ratis/grpc/client/RaftClientProtocolService.java     | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8cac9d54/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index 6d19920..1f604d8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
@@ -111,10 +112,12 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
     private final StreamObserver<RaftClientReplyProto> responseObserver;
     private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
         = new SlidingWindow.Server<>(name, COMPLETED);
+    private final AtomicBoolean isClosed;
 
     AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
       LOG.debug("new AppendRequestStreamObserver {}", name);
       this.responseObserver = ro;
+      this.isClosed = new AtomicBoolean(false);
     }
 
     void processClientRequestAsync(PendingAppend pending) {
@@ -171,9 +174,11 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
     }
 
     private void close() {
-      LOG.debug("{}: close", name);
-      responseObserver.onCompleted();
-      slidingWindow.close();
+      if (isClosed.compareAndSet(false, true)) {
+        LOG.debug("{}: close", name);
+        responseObserver.onCompleted();
+        slidingWindow.close();
+      }
     }
 
     void responseError(Throwable t, Supplier<String> message) {