You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/10 09:35:28 UTC

[incubator-ratis] branch master updated: RATIS-1142. Remove STREAM_CLOSE_FORWARD (#265)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aa4d58  RATIS-1142. Remove STREAM_CLOSE_FORWARD (#265)
6aa4d58 is described below

commit 6aa4d58656fb35855724eb8085d897a7bc16d363
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Nov 10 17:28:58 2020 +0800

    RATIS-1142. Remove STREAM_CLOSE_FORWARD (#265)
---
 .../apache/ratis/client/DataStreamOutputRpc.java   |  3 ---
 .../ratis/client/impl/DataStreamClientImpl.java    |  5 -----
 .../ratis/netty/server/NettyServerStreamRpc.java   | 26 ++++++++++++++--------
 ratis-proto/src/main/proto/Raft.proto              |  3 +--
 4 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
index 19fb46b..ba48324 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
@@ -27,9 +27,6 @@ public interface DataStreamOutputRpc extends DataStreamOutput {
   /** Get the future of the header request. */
   CompletableFuture<DataStreamReply> getHeaderFuture();
 
-  /** Peer close asynchronously. */
-  CompletableFuture<DataStreamReply> closeForwardAsync();
-
   /** Create a transaction asynchronously once the stream data is replicated to all servers */
   CompletableFuture<DataStreamReply> startTransactionAsync();
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2798de6..27583b6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -88,11 +88,6 @@ public class DataStreamClientImpl implements DataStreamClient {
     }
 
     @Override
-    public CompletableFuture<DataStreamReply> closeForwardAsync() {
-      return send(Type.STREAM_CLOSE_FORWARD);
-    }
-
-    @Override
     public CompletableFuture<DataStreamReply> startTransactionAsync() {
       return send(Type.START_TRANSACTION);
     }
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 496e83c..dff6a87 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -254,7 +255,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
       return new StreamInfo(request, stateMachine.data().stream(request),
-          server.getId().equals(request.getServerId())?
+          isPrimary(request.getServerId())?
           proxies.getDataStreamOutput(request) : Collections.EMPTY_LIST);
     } catch (Throwable e) {
       throw new CompletionException(e);
@@ -357,6 +358,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     });
   }
 
+  private boolean isPrimary(RaftPeerId primaryId) {
+    return server.getId().equals(primaryId);
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
@@ -379,7 +384,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
         remoteWrites.add(previous.thenComposeAsync(v -> out.writeAsync(request.slice().nioBuffer()), executorService));
       }
-    } else if (request.getType() == Type.STREAM_CLOSE || request.getType() == Type.STREAM_CLOSE_FORWARD) {
+    } else if (request.getType() == Type.STREAM_CLOSE) {
       info = streams.get(key);
       final CompletableFuture<?> previous = info.getPrevious().get();
 
@@ -392,9 +397,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         }
       }, executorService);
 
-      if (request.getType() == Type.STREAM_CLOSE) {
+      if (isPrimary(info.getRequest().getServerId())) {
         for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
-          remoteWrites.add(previous.thenComposeAsync(v -> out.closeForwardAsync(), executorService));
+          remoteWrites.add(previous.thenComposeAsync(v -> out.closeAsync(), executorService));
         }
       }
     } else if (request.getType() == Type.START_TRANSACTION) {
@@ -411,13 +416,16 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
           if (request.getType() == Type.STREAM_HEADER
-              || request.getType() == Type.STREAM_DATA
-              || request.getType() == Type.STREAM_CLOSE_FORWARD) {
+              || request.getType() == Type.STREAM_DATA) {
             sendReply(remoteWrites, request, bytesWritten, ctx);
           } else if (request.getType() == Type.STREAM_CLOSE) {
-            // after all server close stream, primary server start transaction
-            // TODO(runzhiwang): send start transaction to leader directly
-            startTransaction(info, request, ctx);
+            if (isPrimary(info.getRequest().getServerId())) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              startTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }
           } else {
             throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
           }
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index f3517d6..c799e79 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -287,8 +287,7 @@ message DataStreamPacketHeaderProto {
     STREAM_HEADER = 0;
     STREAM_DATA = 1;
     STREAM_CLOSE = 2;
-    STREAM_CLOSE_FORWARD = 3;
-    START_TRANSACTION = 4;
+    START_TRANSACTION = 3;
   }
 
   uint64 streamId = 1;