You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:56:26 UTC

[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

runzhiwang commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r515819031



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Sorry. When info.getDataStreamOutputs().size() == 0, it means this server is a peer, it only needs to response to primary server with the close stream result. Maybe it need not to call primaryServerStartTransaction.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Thanks for reminding it, you are right, so I think we need to mark which server is primary: https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Thanks for reminding it, you are right, so I think we need to mark which server is primary: https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10. Otherwise, we can not distinguish the peer in a cluster with 3 servers and the primary in a cluster with only one server.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       @szetszwo Thanks for reminding it, you are right, so I think we need to mark which server is primary: https://github.com/apache/incubator-ratis/pull/237/commits/7c578d754385734b98e56cdd65705c06ee381b10. Otherwise, we can not distinguish the peer in a cluster with 3 servers and the primary in a cluster with only one server. Both of their's info.getDataStreamOutputs().size() == 0

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       @szetszwo Thanks the suggestions, I have updated the patch. But this still can not address primary server won't call submitClientRequestAsync if there are no other peers. The problem is that: 1. If peer finish close stream, it must reply to primary success and can not call submitClientRequestAsync.  2. If primary finish close stream and received the success reply of peers, it call submitClientRequestAsync. So peer and primary's behavior when finish close stream is different, we must decide  whether current server is primary or peer, but if a single server without other peers, we can not decide whether it is primary or peer.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       @szetszwo Thanks the suggestion. Update it.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       @amaliujia  we call startTransaction  only when finish close stream, so no need use previous.get().  Close stream split into two phase: close stream and start transaction, server reply only when finish start transaction. 

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       @amaliujia Could not conclude that, not the leader is only one case, there maybe other unexpected cause. But we should forwardStartTransaction anyway.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       Let me update the comment

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       @amaliujia No. Please see here:
   
   client —(close-stream)—> primary server —(close-stream)—> other servers
   primary server <—(ack-close-stream)— other servers
   primary server —(start-transaction)—> other servers
   client <—(RaftClientReply)— primary server <—(RaftClientReply)— leader

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {

Review comment:
       Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org