You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/10/29 07:38:40 UTC

[GitHub] [incubator-ratis] runzhiwang opened a new pull request #237: RATIS-1083. Create a transaction once the stream data is replicated to all servers

runzhiwang opened a new pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237


   
   
   ## What changes were proposed in this pull request?
   
   Create a transaction once the stream data is replicated to all servers
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-1083
   
   ## How was this patch tested?
   
   TODO
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #237: [WIP]RATIS-1083. Create a transaction once the stream data is replicated to all servers

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#issuecomment-718456030


   The client may mark the close flag when sending the last write.  Then, we can save a packet similar to the new design.
   
   Closing a file usually cause OS to flush/sync the data.  We probably want to wait for it before start-transaction.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #237: RATIS-1083. Create a transaction once the stream data is replicated to all servers

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#issuecomment-718429329


   @szetszwo 
   Could you help review this ?  It's only a draft, I'm not sure whether my thought is right, so I have not add unit test, and the code is also rough.
   
   I change the origin design, please correct me if I am wrong. 
   1. Primary server no need to send close-stream to other servers to check whether other servers have received all the data, because primary server can check the STREAM_DATA reply from other servers to check whether other servers have received all the data. 
   2. Do not use start-transaction, when primary server receive close stream and make sure other servers have received all the data, primary server will submit RaftClientRequest to itself directly, if NotLeaderException happens, primary server will send close-stream to the suggested leader, the suggested leader will submit RaftClientRequest to itself.
   3. Get RaftClientRequest from DataStream.
   4. Encode the RaftClientReply into DataStreamReply#buffer
   
   Origin design:
   client —(close-stream)—> primary server —(close-stream)—> other servers
   primary server <—(ack-close-stream)— other servers
   primary server —(start-transaction)—> other servers
   client <—(RaftClientReply)— primary server <—(RaftClientReply)— leader


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#issuecomment-721725243


   reopen to trigger ci


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r515794132



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       When info.getDataStreamOutputs().size() == 0, it should call submitClientRequestAsync (i.e. primaryServerStartTransaction).

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {

Review comment:
       It should use the same executor as the write method.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {

Review comment:
       Let's wait for RATIS-1126 and pass an executor to thenApplyAsync.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       How about the group only has one server?  It seems that the primary server won't call submitClientRequestAsync if there are no other peers.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +214,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private boolean primary;

Review comment:
       This won't work.  A server may be a primary and non-primary at the same time.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       We may combine primaryServerStartTransaction and peerServerStartTransaction to a single method.
   ```
     private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
       try {
         server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
           if (reply.isSuccess()) {
             ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
             sendReplySuccess(request, buffer, -1, ctx);
           } else if (info.getDataStreamOutputs().size() > 0) {
             // if this server is not the leader, forward start transition to the other peers
             forwardStartTransaction(info, request, ctx);
           } else {
             sendReplyNotSuccess(request, ctx);
           }
         });
       } catch (IOException e) {
         sendReplyNotSuccess(request, ctx);
       }
     }
   ```
   Then, we don't need markPrimary().

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       I see. Then, we need to distinguish primary and other peers in the code.  We may change the DataStreamPacketHeaderProto.Type.  We may split STREAM_CLOSE into two types
   - STREAM_CLOSE
   - STREAM_CLOSE_FORWARDED
   When the client sends to the primary, it uses STREAM_CLOSE.  When the primary sends to the other server, it uses STREAM_CLOSE_FORWARDED.

##########
File path: ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
##########
@@ -22,9 +22,12 @@
 
 /** Support the {@link CloseAsync#closeAsync()} method. */
 public interface CloseAsync<REPLY> extends AutoCloseable {
-  /** Close asynchronously. */
+  /** Primary close asynchronously. */

Review comment:
       Please don't this javadoc.   CloseAsync is general API for many different classes.

##########
File path: ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
##########
@@ -22,9 +22,12 @@
 
 /** Support the {@link CloseAsync#closeAsync()} method. */
 public interface CloseAsync<REPLY> extends AutoCloseable {
-  /** Close asynchronously. */
+  /** Primary close asynchronously. */
   CompletableFuture<REPLY> closeAsync();
 
+  /** Peer close asynchronously. */
+  CompletableFuture<REPLY> closeForwardAsync();
+

Review comment:
       Please don't add it here since it is specific to Streaming.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r515819031



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Sorry. When info.getDataStreamOutputs().size() == 0, it means this server is a peer, it only needs to response to primary server with the close stream result. Maybe it need not to call primaryServerStartTransaction.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Thanks for reminding it, you are right, so I think we need to mark which server is primary: https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Thanks for reminding it, you are right, so I think we need to mark which server is primary: https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10. Otherwise, we can not distinguish the peer in a cluster with 3 servers and the primary in a cluster with only one server.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Thanks for reminding it, you are right, so I think we need to mark which server is primary: https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10. Otherwise, we can not distinguish the peer in a cluster with 3 servers and the primary in a cluster with only one server. Both of their's info.getDataStreamOutputs().size() == 0

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       @szetszwo Thanks the suggestions, I have updated the patch. But this still can not address primary server won't call submitClientRequestAsync if there are no other peers. The problem is that: 1. If peer finish close stream, it must reply to primary success and can not call submitClientRequestAsync.  2. If primary finish close stream and received the success reply of peers, it call submitClientRequestAsync. So peer and primary's behavior when finish close stream is different, we must decide  whether current server is primary or peer, but if a single server without other peers, we can not decide whether it is primary or peer.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       @szetszwo Thanks the suggestion. Update it.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       @amaliujia  we call startTransaction  only when finish close stream, so no need use previous.get().  Close stream split into two phase: close stream and start transaction, server reply only when finish start transaction. 

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       @amaliujia Could not conclude that, not the leader is only one case, there maybe other unexpected cause. But we should forwardStartTransaction anyway.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       Let me update the comment

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       @amaliujia No. Please see here:
   
   client —(close-stream)—> primary server —(close-stream)—> other servers
   primary server <—(ack-close-stream)— other servers
   primary server —(start-transaction)—> other servers
   client <—(RaftClientReply)— primary server <—(RaftClientReply)— leader

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {

Review comment:
       Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #237: [WIP]RATIS-1083. Create a transaction once the stream data is replicated to all servers

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#issuecomment-718932263


   cc @amaliujia 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#issuecomment-721707728


   @szetszwo I have updated the patch to use ExecutorService in async call. Could you help review it again ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r516369839



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       I think `startTransaction` is not chained with `previous` async operation? Is `startTransaction` still executed synchronously after all previous operations? 

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {

Review comment:
       it might be better to just use 
   ```
   if (reply.isSuccess()) 
   else if (request.getType() == Type.STREAM_CLOSE)
   else if (request.getType() == Type.START_TRANSACTION)
   else
   ```
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       I might have missed something: so fail to apply `STREAM_CLOSE` to RaftServer can conclude to that the server is not the leader?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       ```
   CompletableFuture<?> current = previous.get().....
   previous.set(current);
   ```
   Should `startTransaction` chained with this logic?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       I might have missed something: so fail to apply `STREAM_CLOSE` to RaftServer can conclude that the server is not the leader?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       Got it. I was bit confused on the comment here :)

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       I see. So client will control when to start to close the stream. And likely the client will only do that after it confirms data has streamed successfully. Thus there is no need to chain the `startTransaction`?
   
   If so, though client might try to close the stream before confirming data has been written successfully, I guess that is a kind of exception and we don't need to address that now.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       Got it. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #237: [WIP]RATIS-1083. Create a transaction once the stream data is replicated to all servers

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#issuecomment-718452423


   The original design is to make sure that close is applied in every server before sending start-transaction.  You are suggesting whether we could optimise it by not combining close and start-transaction.
   
   I think the optimisation may not be significant because it is a stream.
   - Original design: When we send close, we do not have to wait for the last write-ack in the stream.  We will wait for the close-ack and then send start-transaction.
   - New design: We wait for the write-ack and then send close.
   
   In both cases, we have to wait for one ack.  So the optimisation seems minor.
   
   If we implement the new design, we need to handle the case that some server fails (or is very slow) to close after received start-transaction.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #237: [WIP]RATIS-1083. Create a transaction once the stream data is replicated to all servers

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#issuecomment-718473579


   @szetszwo Thanks, got it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org