You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/03 13:54:17 UTC

[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

szetszwo commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r515794132



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       When info.getDataStreamOutputs().size() == 0, it should call submitClientRequestAsync (i.e. primaryServerStartTransaction).

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {

Review comment:
       It should use the same executor as the write method.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {

Review comment:
       Let's wait for RATIS-1126 and pass an executor to thenApplyAsync.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -292,35 +297,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (info.getDataStreamOutputs().size() > 0) {

Review comment:
       How about the group only has one server?  It seems that the primary server won't call submitClientRequestAsync if there are no other peers.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +214,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private boolean primary;

Review comment:
       This won't work.  A server may be a primary and non-primary at the same time.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       We may combine primaryServerStartTransaction and peerServerStartTransaction to a single method.
   ```
     private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
       try {
         server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
           if (reply.isSuccess()) {
             ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
             sendReplySuccess(request, buffer, -1, ctx);
           } else if (info.getDataStreamOutputs().size() > 0) {
             // if this server is not the leader, forward start transition to the other peers
             forwardStartTransaction(info, request, ctx);
           } else {
             sendReplyNotSuccess(request, ctx);
           }
         });
       } catch (IOException e) {
         sendReplyNotSuccess(request, ctx);
       }
     }
   ```
   Then, we don't need markPrimary().

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +311,128 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void primaryServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            // if primary server is not the leader, primary ask all the other peers to start transaction
+            askPeerStartTransaction(info, request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
+  private void askPeerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
+  private void peerServerStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    info.getStream().thenApplyAsync(stream -> {
+      try {
+        server.submitClientRequestAsync(info.getRequest()).thenApplyAsync(reply -> {
+          if (reply.isSuccess()) {
+            ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+            sendReplySuccess(request, buffer, -1, ctx);
+          } else {
+            sendReplyNotSuccess(request, ctx);
+          }
+          return null;
+        });
+      } catch (IOException e) {
+        sendReplyNotSuccess(request, ctx);
+      }
+      return null;
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeAsync());
+      }
+    } else {
+      // peer server start transaction
+      peerServerStartTransaction(streams.get(key), request, ctx);
+      return;
     }
 
     final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
     final CompletableFuture<?> current = previous.get()
         .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
-          sendReply(remoteWrites, request, bytesWritten, ctx);
+          if (request.getType() == Type.STREAM_HEADER || request.getType() == Type.STREAM_DATA) {
+            sendReply(remoteWrites, request, bytesWritten, ctx);
+          } else if (request.getType() == Type.STREAM_CLOSE) {
+            if (isPrimary()) {
+              // after all server close stream, primary server start transaction
+              // TODO(runzhiwang): send start transaction to leader directly
+              primaryServerStartTransaction(info, request, ctx);
+            } else {
+              sendReply(remoteWrites, request, bytesWritten, ctx);
+            }

Review comment:
       I see. Then, we need to distinguish primary and other peers in the code.  We may change the DataStreamPacketHeaderProto.Type.  We may split STREAM_CLOSE into two types
   - STREAM_CLOSE
   - STREAM_CLOSE_FORWARDED
   When the client sends to the primary, it uses STREAM_CLOSE.  When the primary sends to the other server, it uses STREAM_CLOSE_FORWARDED.

##########
File path: ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
##########
@@ -22,9 +22,12 @@
 
 /** Support the {@link CloseAsync#closeAsync()} method. */
 public interface CloseAsync<REPLY> extends AutoCloseable {
-  /** Close asynchronously. */
+  /** Primary close asynchronously. */

Review comment:
       Please don't this javadoc.   CloseAsync is general API for many different classes.

##########
File path: ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
##########
@@ -22,9 +22,12 @@
 
 /** Support the {@link CloseAsync#closeAsync()} method. */
 public interface CloseAsync<REPLY> extends AutoCloseable {
-  /** Close asynchronously. */
+  /** Primary close asynchronously. */
   CompletableFuture<REPLY> closeAsync();
 
+  /** Peer close asynchronously. */
+  CompletableFuture<REPLY> closeForwardAsync();
+

Review comment:
       Please don't add it here since it is specific to Streaming.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org