You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/10/03 07:09:07 UTC

[GitHub] [incubator-ratis] amaliujia opened a new pull request #213: RATIS-1082. Netty stream server should forward the data to other server in the group

amaliujia opened a new pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213


   
   ## What changes were proposed in this pull request?
   
   Once a stream server has received data from client, it should forward the data to the other servers. For simplicity, a star topology is used in this JIRA. In the future, the star topology will be replaced by pipelines in order to avoid additional cross rack traffic.
   
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-1082
   
   ## How was this patch tested?
   
   UT
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508125067



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,51 +63,55 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());

Review comment:
       This is a very good point. Right now indeed it only keeps a consistent mapping between stream ids. There might be a case that these stream id are different.
   
   However, I am thinking even have a consistent stream ids for all servers, it might not be good enough for debugging. Ideally, we need a way to logging the path of a request, e.g. from client to server, then to other servers, and stream id mappings, and where is wrong. And then the best is aggregate such information and send back to the client. 
   
   
   https://issues.apache.org/jira/browse/RATIS-1098 is partially related to this goal, meanwhile we might can extend RATIS-1098 or have another Jira to fully achieve this goal. 
   
   There is still some open problems that are not handled. For example, what if a request in the middle of a stream failed (meanwhile the stream is ordered), what should we do? How to debug and how to retry or fail the stream. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507480100



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];

Review comment:
       Use ArrayList, then toArray at CompletableFuture.allOf(...)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507957005



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
       `peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());` will trigger other header requests when receive a header request (i.e. a new stream id).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r503541557



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       Agreed. Let me think about this. 
   
   Make the writes happen in parallel does make sense. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507016775



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -97,33 +98,44 @@ public WritableByteChannel getWritableByteChannel() {
 
     @Override
     public CompletableFuture<DataStream> stream(RaftClientRequest request) {
-      writeRequest = request;
+      if (request.getClientId().equals(impl.getHeader().getClientId())) {

Review comment:
       Because all statemachine will share `writeRequest` thus set it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502204193



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       yes. I was thinking whether we need a read API to verify that write is complete. I do need suggestion here: do we need a read API to have a better test?

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       yes. I was thinking whether we need a read API to verify that write is complete. I do need suggestion here: do we need a read API to have a better test? E.g. use read API to verify that all writes has succeed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507109304



##########
File path: ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
##########
@@ -40,4 +40,4 @@ public long getDataLength() {
   public ByteBuffer slice() {
     return buffer.slice();
   }
-}
+}

Review comment:
       Please revert whitespace like this

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -166,8 +213,11 @@ public void runTestDataStream(){
     Assert.assertEquals(writeRequest.getCallId(), impl.getHeader().getCallId());
     Assert.assertEquals(writeRequest.getRaftGroupId(), impl.getHeader().getRaftGroupId());
     Assert.assertEquals(writeRequest.getServerId(), impl.getHeader().getServerId());
-
-    Assert.assertEquals(dataSize, byteWritten);
+    int actualBytesWritten = 0;
+    for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
+      actualBytesWritten += s.getByteWritten();
+    }
+    Assert.assertEquals(dataSize * peers.size(), actualBytesWritten);

Review comment:
       Check individual sizes.
   ```
       for (SingleDataStreamStateMachine s : singleDataStreamStateMachines) {
         Assert.assertEquals(dataSize, s.getByteWritten());
       }
   ```
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+        CompletableFuture<?> localWrites =
+            streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, shouldRelease))
+            .thenApply(stream -> writeTo(buf, stream, shouldRelease.get()));
+        parallelWrites[0] = localWrites;

Review comment:
       We may use isHeader to determine the cases.
   ```
           final CompletableFuture<?> localWrites = isHeader?
               streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
               : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
           parallelWrites[parallelWrites.length - 1] = localWrites;
   ```

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,31 +65,44 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClientImpl> clients = new ArrayList<>();

Review comment:
       NettyServerStreamRpc should use the public internal api but not the impls.  Therefore, it should use DataStreamClient instead of DataStreamClientImpl.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -146,6 +190,13 @@ ChannelFuture buildChannel() {
         .bind();
   }
 
+  public void setupClient(List<RaftPeer> otherPeers, RaftProperties properties) {
+    for (RaftPeer peer : otherPeers) {
+      DataStreamClientImpl impl = new DataStreamClientImpl(peer, properties, null);

Review comment:
       Use DataStreamClient.Builder

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();

Review comment:
       Change it to
   ```
   final boolean isHeader = request.getStreamOffset() == -1;
   ```

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +136,33 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+        CompletableFuture<?> localWrites =
+            streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, shouldRelease))
+            .thenApply(stream -> writeTo(buf, stream, shouldRelease.get()));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+        for (int i = 0; i < clients.size(); i++) {
+          // do not need to forward header request
+          if (request.getStreamOffset() == -1) {
+            DataStreamClientImpl.DataStreamOutputImpl impl =
+                (DataStreamClientImpl.DataStreamOutputImpl) peersStreamOutput.get(request.getStreamId()).get(i);
+            parallelWrites[i + 1] = impl.getHeaderFuture();
+          } else {
+            CompletableFuture<?> remoteWrites;
+            if (shouldRelease.get()) {
+              remoteWrites = new CompletableFuture<>();
+            } else {
+              remoteWrites =
+                  peersStreamOutput.get(request.getStreamId()).get(i).writeAsync(request.slice().nioBuffer());
+            }
+            parallelWrites[i + 1] = remoteWrites;
+          }
+        }

Review comment:
       Similar, use isHeader for the remote writes
   ```
           if (isHeader) {
             // header
             final List<DataStreamOutput> outs = getDataStreamOutput();
             peersStreamOutput.put(request.getStreamId(), outs);
             for (int i = 0; i < outs.size(); i++) {
               parallelWrites[i] = outs.get(i).getHeaderFuture();
             }
           } else {
             // body
             final List<DataStreamOutput> outs =  peersStreamOutput.get(request.getStreamId());
             for (int i = 0; i < clients.size(); i++) {
               parallelWrites[i] = outs.get(i).writeAsync(request.slice().nioBuffer());
             }
           }
   ```
   
   Also, add getHeaderFuture() to DataStreamClient.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       The correct way to make startClient to retry.  In a distributed system, we cannot assume that all the servers are already started.
   
   It needs more works for retrying.  I am fine to add NettyServerStreamRpc.startClientToPeers as a work around in the mean time.  However, please remove DataStreamServerRpc.startClientToPeers.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -18,9 +18,17 @@
 
 package org.apache.ratis.netty.server;
 
+import java.util.stream.Collectors;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ratis.client.api.DataStreamOutput;
+import org.apache.ratis.client.impl.DataStreamClientImpl;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;

Review comment:
       Please sort the new imports with the existing imports.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -155,6 +206,13 @@ public void startServer() {
     channelFuture.syncUninterruptibly();
   }
 
+  @Override
+  public void startClientToPeers() {
+    for (DataStreamClientImpl client : clients) {
+      client.start();

Review comment:
       Add start() to DataStreamClient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502093573



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -140,6 +160,14 @@ ChannelFuture buildChannel() {
         .bind();
   }
 
+  public void setupClient(List<RaftPeer> otherPeers, RaftProperties properties) {

Review comment:
       public -> private




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507501923



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       @szetszwo How about use the following code? sendReply charges of buf.release() seems confused.
   CompletableFuture.allOf(parallelWrites).whenComplete((t, r) ->  {
             buf.release();
             sendReply(request, ctx, buf);
           });




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508125067



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,51 +63,55 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());

Review comment:
       This is a very good point. Right now indeed it only keeps a consistent mapping between stream ids. There might be a case that these stream id are different.
   
   However, I am thinking even have a consistent stream ids for all servers, it might not be good enough for debugging. Ideally, we need a way to logging the path of a request, e.g. from client to server, then to other servers, and stream id mappings, and where is wrong. And then the best is aggregate such information and send back to the client. 
   
   
   https://issues.apache.org/jira/browse/RATIS-1098 is partially related to this goal, meanwhile we might can extend RATIS-1098 or have another Jira to fully achieve this goal. 
   
   There are still some open problems that are not handled. For example, what if a request in the middle of a stream failed (meanwhile the stream is ordered), what should we do? How to debug and how to retry or fail the stream. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help review this ? If with the implementation of this patch, streamId always be one,  also confused me.
   In my understanding, each request has a unique streamId. I think we can change the patch by following
    code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       if (!streamMap.containsKey(streamId)) {
         List<DataStreamOutput> streamOutputList = new ...
         for (DataStreamClientImpl client : clients) {
            streamOutputList.add(client.stream());
         }
         streamMap.put(streamId, streamOutputList);
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507485476



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -140,7 +189,7 @@ public void runTestDataStream(){
     final int bufferSize = 1024*1024;
     final int bufferNum = 10;
     final DataStreamOutput out = client.stream();
-    DataStreamClientImpl.DataStreamOutputImpl impl = (DataStreamClientImpl.DataStreamOutputImpl) out;
+    impl = (DataStreamClientImpl.DataStreamOutputImpl) out;

Review comment:
       Please revert the unnecessary change to make the code easy to review. If you want to refactor it, you can create another PR. You can also check other places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507015364



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -97,33 +98,44 @@ public WritableByteChannel getWritableByteChannel() {
 
     @Override
     public CompletableFuture<DataStream> stream(RaftClientRequest request) {
-      writeRequest = request;
+      if (request.getClientId().equals(impl.getHeader().getClientId())) {

Review comment:
       why we need this check ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508119969



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];

Review comment:
       Okay, reasonable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507025392



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       lazy connection is OK. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507501923



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       @szetszwo @amaliujia  How about use the following code? sendReply charges of buf.release() seems confused.
   ```
   CompletableFuture.allOf(parallelWrites).whenComplete((t, r) ->  {
             buf.release();
             sendReply(request, ctx, buf);
   });
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r501252367



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       yeah this is a good point.
   
   Each `client.stream()` will increase stream id, I am not sure if changing stream id matters. For example, if client sends 100 message, they will under the format of same stream id and 100 different message id. if we do `client.stream()` each time here, the messages forwarded to peers will become the format of different stream id, same message id (assume message id counts from 0 for each stream id), which seemed not right.
   
   However, I am not sure when stream id for the forwarded steam should be updated within a `NettyServerStreamRpc`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499948036



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       Maybe `clients` is enough, we do not need `streams`, it cause confused, when we need stream, we get it from `client.stream()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507501923



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       Okay, I see, maybe we can rename sendReply,  because sendReply charges of buf.release seems confused.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507494361



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       It needs to wait for all the futures to complete before releasing.  Actually, let's just call buf.release() in the beginning instead of using try-finally.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to other server in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499124158



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       Have this API because we need to have all stream servers setup, then for those servers that need to forward message, call `startClientToPeers` to initialize clients connecting to peers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499970362



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -37,34 +39,68 @@
 
 public class TestDataStream extends BaseTest {
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);
-    server.getServerRpc().startServer();
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }

Review comment:
             DataStreamServerImpl streamServer;
         if (i == 0) {
           // only the first server routes requests to peers.
           List<RaftPeer> otherPeers = new ArrayList<>(peers);
           otherPeers.remove(peers.get(i));
           streamServer =
               new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
         } else {
           streamServer =
               new DataStreamServerImpl(peers.get(i), properties, null);
         }
           servers.add(streamServer);
           streamServer.getServerRpc().startServer();




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507949667



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];

Review comment:
       Not doing this for two reasons
   1. is this is a case that peers size is known and small, so use an fixed size array fits here.
   2. is Because `CompletableFuture.allOf(...).` exposes API also using generic, java will interpret the result of that list to array as `Object` (so not `CompletableFuture<?>`), then I need to do a manual cast, which is not compile time type safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507954892



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
       The header happens when we create a DataStreamOutput: https://github.com/apache/incubator-ratis/blob/88327daff3261cb3e2929d7fbcbba1ec51d67a34/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java#L79
   
   so for the first header request, it will trigger two other header requests when we initialize two DataStreamOutput for current stream. Then we need to wait for all header requests succeed before reply to client to say its header request succeed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507338537



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       Ok I have chosen to use the workaround that exposes `NettyServerStreamRpc.startClientToPeers` , and file https://issues.apache.org/jira/browse/RATIS-1099 to track make the connection built automatically without a call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  also confused me.
   In my understanding, each request has a unique streamId. I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       if (!streamMap.containsKey(streamId)) {
         streamMap.put(streamId, new ArrayList<>());
         for (DataStreamClientImpl client : clients) {
            streamMap.get(streamId).add(client.stream());
         }
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502207309



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       I think we do not need to add a read API, we can use SingleDataStreamStateMachine to verify it, and define byteWritten in  SingleDataStreamStateMachine, both leader and follower will use SingleDataStreamStateMachine to write data, we can use SingleDataStreamStateMachine#byteWritten  to verify all writes has succeed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508121893



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,51 +63,55 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());

Review comment:
       @szetszwo @amaliujia Should we use the streamId got from client to current server ? Maybe it's easy to debug when error happens. Not sure when the streamId will be different among 3 servers, but if the streamId is different, the same stream has different streamId among 3 servers seems confused.
   
   ```
   private List<DataStreamOutput> getDataStreamOutput(long streamId) {
       return clients.stream().map(client -> client.stream(streamId)).collect(Collectors.toList());
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499970362



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -37,34 +39,68 @@
 
 public class TestDataStream extends BaseTest {
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);
-    server.getServerRpc().startServer();
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }

Review comment:
             DataStreamServerImpl streamServer;
         if (i == 0) {
           // only the first server routes requests to peers.
           List<RaftPeer> otherPeers = new ArrayList<>(peers);
           otherPeers.remove(peers.get(i));
           streamServer =
               new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
         } else {
           streamServer =
               new DataStreamServerImpl(peers.get(i), properties, null);
         }
         servers.add(streamServer);
         streamServer.getServerRpc().startServer();




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508121893



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,51 +63,55 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());

Review comment:
       @szetszwo @amaliujia Should we use the streamId got from client to current server ? Maybe it's easy to debug when error happens. Not sure when the streamId will be different among 3 servers, but if streamId is different, the same stream has different streamId among 3 servers seems confused.
   
   ```
   private List<DataStreamOutput> getDataStreamOutput(long streamId) {
       return clients.stream().map(client -> client.stream(streamId)).collect(Collectors.toList());
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507413965



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
##########
@@ -50,6 +53,10 @@
 
     private Builder() {}
 
+    public static Builder getClientBuilder() {

Review comment:
       why not use newBuilder() ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507957927



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -140,7 +189,7 @@ public void runTestDataStream(){
     final int bufferSize = 1024*1024;
     final int bufferNum = 10;
     final DataStreamOutput out = client.stream();
-    DataStreamClientImpl.DataStreamOutputImpl impl = (DataStreamClientImpl.DataStreamOutputImpl) out;
+    impl = (DataStreamClientImpl.DataStreamOutputImpl) out;

Review comment:
       oops. Done!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one, also weird.
   In my understanding, each request has a unique streamId.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r501249518



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       I am little confused here: this is an implementation that forwards messages to peers from a server, and then wait the writes on peers finish. How is it related to read?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507017344



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       The problem is when `startServer`, as other servers might not ready, we cannot guarantee that we can connect to peer (will have fail to connect when other servers are not ready).
   
   And I think usually the sequence calls is start server one by one., so then only the last server can connect with all peers in its `startServer`
    
   
   If you still want to hide this `startClientToPeers`, then maybe we should do some lazy connection that when receives a first request, connects with other servers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507480100



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];

Review comment:
       Use ArrayList, then toArray at CompletableFuture.allOf(...).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507397141



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -99,37 +104,81 @@ public WritableByteChannel getWritableByteChannel() {
       writeRequest = request;
       return CompletableFuture.completedFuture(stream);
     }
+
+    public int getByteWritten() {

Review comment:
       if you rename byteWritten to bytesWritten, you should also rename getByteWritten to getBytesWritten. But maybe, we should not rename byteWritten, it's unnecessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  also confused me.
   In my understanding, each request has a unique streamId. I think we can change the patch by following
    code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       if (!streamMap.containsKey(streamId)) {
         streamMap.put(streamId, new ArrayList<>());
         for (DataStreamClientImpl client : clients) {
            streamMap.get(streamId).add(client.stream());
         }
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  also confused me.
   In my understanding, each request has a unique streamId. I think we can change the patch by following
    code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       if (!streamMap.containsKey(streamId)) {
         List<DataStreamOutput> streamOutputList = new ...
         for (DataStreamClientImpl client : clients) {
            streamOutputList.add(client.stream());
         }
         streamMap.put(streamId, streamOutputList);
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508125067



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,51 +63,55 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());

Review comment:
       This is a very good point. Right now indeed it only keeps a consistent mapping between stream ids. There might be a case that these stream id are different.
   
   However, I am thinking even have a consistent stream ids for all servers, it might not be good enough for debugging. Ideally, we need a way to logging the path of a request, e.g. from client to server, then to other servers, and stream id mappings, and where is wrong. And then the best is aggregate such information and send back to the client. 
   
   
   https://issues.apache.org/jira/browse/RATIS-1098 is partially related to this goal, meanwhile we might can extend RATIS-1098 or have another Jira to fully achieve this goal. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to other server in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499124095



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
##########
@@ -36,4 +38,11 @@ static DataStreamServerFactory cast(DataStreamFactory dataStreamFactory) {
    * Server implementation for streaming in Raft group
    */
   DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);
+
+  /**
+   * Server implementation for streaming in Raft group. The server will forward requests
+   * to peers.
+   */
+  DataStreamServerRpc newDataStreamServerRpc(
+      RaftPeer server, List<RaftPeer> peers, RaftProperties properties);

Review comment:
       I have added this new API that assume servers can have two modes:
   1. with peers parameter this server forwards message to peers (if the server is initialized by `ataStreamServerRpc newDataStreamServerRpc(RaftPeer server, List<RaftPeer> peers, RaftProperties properties);`
   2. without peers parameter this server does not forward messages (if the server is initialized by `DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);`)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r503545313



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       @runzhiwang how do you think about discussion here? 
   
   basically the requirement is:
   1. forward writes to peers (which can happen in parallel).
   2. The `Future` held by client says complete means no only writes on this server is completed,  but also other peers completes writes.
   
   That's why I was thinking we might need `join` before `ctx.writeAndFlush(reply)` to wait other peers finish writes.
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508125067



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,51 +63,55 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());

Review comment:
       This is a very good point. Right now indeed it only keeps a consistent mapping between stream ids. There might be a case that these stream id are different.
   
   However, I am thinking even have a consistent stream ids for all servers, it might not be good enough for debugging. Ideally, we need a way to logging the path of a request, e.g. from client to server, then to other servers, and stream id mappings, and where is wrong. And then the best is to aggregate such information and send it back to the client. 
   
   
   https://issues.apache.org/jira/browse/RATIS-1098 is partially related to this goal, meanwhile we might can extend RATIS-1098 or have another Jira to fully achieve this goal. 
   
   There are still some open problems that are not handled. For example, what if a request in the middle of a stream failed (meanwhile the stream is ordered), what should we do? How to debug and how to retry or fail the stream. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507482960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
       Do not understand why add header future 2 times, when there are 2 clients.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java. I think we can change by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to other server in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499124095



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
##########
@@ -36,4 +38,11 @@ static DataStreamServerFactory cast(DataStreamFactory dataStreamFactory) {
    * Server implementation for streaming in Raft group
    */
   DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);
+
+  /**
+   * Server implementation for streaming in Raft group. The server will forward requests
+   * to peers.
+   */
+  DataStreamServerRpc newDataStreamServerRpc(
+      RaftPeer server, List<RaftPeer> peers, RaftProperties properties);

Review comment:
       I have added this new API that assume servers can have two modes:
   1. with peers parameter this server forwards message to peers (if the server is initialized by `ataStreamServerRpc newDataStreamServerRpc(RaftPeer server, List<RaftPeer> peers, RaftProperties properties);`
   2. without peers parameter this server does not forward messages (if the server is initialized by `DataStreamServerRpc newDataStreamServerRpc(RaftPeer server);`)
   
   
   We can choose to leave one API which always expose the `List<RaftPeer> peers`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507421510



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
##########
@@ -50,6 +53,10 @@
 
     private Builder() {}
 
+    public static Builder getClientBuilder() {

Review comment:
       Renamed to `newBuilder()`

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -99,37 +104,81 @@ public WritableByteChannel getWritableByteChannel() {
       writeRequest = request;
       return CompletableFuture.completedFuture(stream);
     }
+
+    public int getByteWritten() {

Review comment:
       I see. Changed back to `byteWritten`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499971735



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -37,34 +39,68 @@
 
 public class TestDataStream extends BaseTest {
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);
-    server.getServerRpc().startServer();
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
   public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);

Review comment:
       change `public void setupClient()` to `private void setupClient()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r508125067



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,51 +63,55 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());

Review comment:
       This is a very good point. Right now indeed it only keeps a consistent mapping between stream ids. There might be a case that these stream id are different.
   
   However, I am thinking even have a consistent stream ids for all servers, it might not be good enough for debugging. Ideally, we need a way to logging the path of a request, e.g. from client to server, then to other servers, and stream id mappings, and where is wrong. And then the best is to aggregate such information and send it back to the client. 
   
   
   https://issues.apache.org/jira/browse/RATIS-1098 is partially related to this goal, meanwhile we might can extend RATIS-1098 or have another Jira to fully achieve this goal. 
   
   There are still some open problems that are not handled. For example, what if a request in the middle of a stream fail (meanwhile the stream is ordered), what should we do? How to debug and how to retry or fail the stream. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r501248797



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       `startServer` is to start the stream server.  `startClientToPeers` means to build connections with other stream servers, which implies that other stream servers must be called `startServer` firstly.
   
   That's why I cannot call `startClientToPeers` in `startServer` because other stream servers might not start.
   
   Alternatively, we could lazy initialize the clients to peers, by checking whether connection is built when having first request. But I am not sure whether this is a good idea.
   
    `startServer` can be called without calling `startClientToPeers` afterwards, which means that this stream will not forward message to peers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#issuecomment-711109445


   R @szetszwo @runzhiwang 
   
   Have rebased this PR and addressed some of the important comments. Can you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507501923



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       @szetszwo @amaliujia  How about use the following code? sendReply charges of buf.release() seems confused.
   ```
   CompletableFuture.allOf(parallelWrites).whenComplete((t, r) ->  {
             buf.release();
             sendReply(request, ctx, buf);
           });
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502726425



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       Besides testDataStreamMultipleServer is not steady, sometimes it pass fast, but sometimes it run a very long time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502204193



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       yes. I was thinking whether we need a read API to verify that write is complete. I do need suggestion here: do we need a read API to have a better test? E.g. use read API to verify that all writes has succeed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507949667



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];

Review comment:
       Not doing this for two reasons
   1. this is a case that peers size is known and small, so use an fixed size array fits here.
   2. Java will interpret the result of that list to array as `Object` (so not `CompletableFuture<?>`), then I need to do a manual cast from Object to CompletableFuture, which is not compile time type safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507485476



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -140,7 +189,7 @@ public void runTestDataStream(){
     final int bufferSize = 1024*1024;
     final int bufferNum = 10;
     final DataStreamOutput out = client.stream();
-    DataStreamClientImpl.DataStreamOutputImpl impl = (DataStreamClientImpl.DataStreamOutputImpl) out;
+    impl = (DataStreamClientImpl.DataStreamOutputImpl) out;

Review comment:
       Please revert the unnecessary change to make the code easy to review. If you want to refactor it, you can create another PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507392917



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -37,34 +39,68 @@
 
 public class TestDataStream extends BaseTest {
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);
-    server.getServerRpc().startServer();
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }

Review comment:
       oops. Have pushed another commit to address this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507420490



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
##########
@@ -50,6 +53,10 @@
 
     private Builder() {}
 
+    public static Builder getClientBuilder() {

Review comment:
       Do you mean rename to `newBuilder()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r503645201



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       @amaliujia The point of @szetszwo is there are many sub write requests of one big write request, for example, when client want to write 640KB data, it will split it into 10 sub requests, each sub request write 64KB, when server1 receive the [0, 64KB), it will forward [0, 64KB) to server2 and server3, but if we do future.join, before finish forwarding [0, 64KB), server1 can not read [64KB, 128KB), it will delay the streaming.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507480100



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];

Review comment:
       Use ArrayList, then toArray at CompletableFuture.allOf(...), so that do not need to define size.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r501260440



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -37,34 +39,68 @@
 
 public class TestDataStream extends BaseTest {
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);
-    server.getServerRpc().startServer();
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }

Review comment:
       will address this comment later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499948036



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       Maybe `clients` is enough, we do not need streams, it cause confused, when we need stream, we get it from `client.stream()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502136174



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       I think this test does not cover "forward the data to the other servers".  If you comment the following code, this test still can pass.
   ```
         // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputs) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507015950



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       > startServer can be called without calling startClientToPeers afterwards, which means that this stream will not forward message to peers.
   
   I think `startClientToPeers` should put into `startServer`, even though this stream will not forward message to peers,  because in this case clients is empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to [DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java). I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  also confused me.
   In my understanding, each request has a unique streamId. I think we can change the patch by following pseudo 
    code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       if (!streamMap.containsKey(streamId)) {
         streamMap.put(streamId, new ArrayList<>());
         for (DataStreamClientImpl client : clients) {
            streamMap.get(streamId).add(client.stream());
         }
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507482960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
        why add header future 2 times, when there are 2 clients ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#issuecomment-712546827


   @amaliujia Thanks the patch. @szetszwo Thanks for review. I have merged it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499971513



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -37,34 +39,68 @@
 
 public class TestDataStream extends BaseTest {
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);

Review comment:
       change `public void setupServer() ` to `private void setupServer()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one, also weird.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507456225



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
##########
@@ -50,6 +53,10 @@
 
     private Builder() {}
 
+    public static Builder newBuilder() {

Review comment:
       done.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +135,30 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();

Review comment:
       removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502275647



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       I think we can do `channel.write(buffer);` and `streamOutput.streamAsync(buf.nioBuffer());` in parallel, `channel.write(buffer)`  maybe slow when write file in ozone. But not sure, just a discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502207309



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       I think we need. @szetszwo What do you think ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to other server in the group

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499194565



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       startServer should take care the initial setup.  Would there be a case that startServer will be called but not startClientToPeers?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       Read should not wait for the future, otherwise it will slow down read.  We should have another thread pool to wait, handle exceptions/retries and send replies.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java. I think we can change by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (DataStreamClientImpl client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507009684



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       Now I use `SingleDataStreamStateMachine`'s bytewritten to verify that every server will have same amount of data written.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r501256086



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       my current understanding is `ctx.writeAndFlush(reply);` will complete the future on client side, so before run this code, we need to wait writes on other peers to complete. That's why I was thinking we need some sort of `future.join` before ``ctx.writeAndFlush(reply);` `

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       my current understanding is `ctx.writeAndFlush(reply);` will complete the future on client side, so before run this code, we need to wait writes on other peers to complete. That's why I was thinking we need some sort of `future.join` before `ctx.writeAndFlush(reply);` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r503545313



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       @runzhiwang how do you think about discussion here? 
   
   basically the requirement is:
   1. forward writes to peers (which can happen in parallel).
   2. The `Future` held by client says complete means not only writes on this server is completed,  but also other peers completes writes.
   
   That's why I was thinking we might need `join` before `ctx.writeAndFlush(reply)` to wait other peers finish writes.
   
   
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       @runzhiwang how do you think about discussion here? 
   
   basically the requirement is:
   1. forward writes to peers (which can happen in parallel).
   2. The `Future` held by client says complete means not only writes on this server is completed,  but also other peers complete writes.
   
   That's why I was thinking we might need `join` before `ctx.writeAndFlush(reply)` to wait other peers finish writes.
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507440176



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +135,30 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final AtomicBoolean shouldRelease = new AtomicBoolean();

Review comment:
       shouldRelease is no longer needed.  Please remove it.

##########
File path: ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java
##########
@@ -50,6 +53,10 @@
 
     private Builder() {}
 
+    public static Builder newBuilder() {

Review comment:
       Move it to DataStreamClient.

##########
File path: ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.RaftAsyncTests;
+
+public class TestRaftAsyncWithNetty extends RaftAsyncTests<MiniRaftClusterWithNetty>

Review comment:
       Wrong file?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #213: RATIS-1082. Netty stream server should forward the data to other server in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#issuecomment-703059304


   R: @szetszwo @runzhiwang 
   
   This is the first version of the implementation about stream forwarding messages to peers. I am sure this version needs lots of improvements and I want to hear your feedback and suggestions. 
   
   Will also rebase this PR once #212  is merged 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507482960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
        why add 2 header futures ? We do not forward header to the other 2 servers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507957739



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       Thanks! Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507487483



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       I do not understand why move buf.release() from writeTo method to sendReply.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#issuecomment-711465991


   @szetszwo @runzhiwang 
   
   addressed all actionable actions so far. Can you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to [DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java). I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502275647



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       I think we can do `channel.write(buffer);` and `streamOutput.streamAsync(buf.nioBuffer());` in parallel, `channel.write(buffer)`  maybe slow when write file. But not sure, just a discussion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507025165



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -97,33 +98,44 @@ public WritableByteChannel getWritableByteChannel() {
 
     @Override
     public CompletableFuture<DataStream> stream(RaftClientRequest request) {
-      writeRequest = request;
+      if (request.getClientId().equals(impl.getHeader().getClientId())) {

Review comment:
       you can put writeRequest in SingleDataStreamStateMachine as a field.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507482960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
        why add header future 2 times ? We do not forward header to the other 2 servers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507668806



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       Sure, it looks good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507381679



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -37,34 +39,68 @@
 
 public class TestDataStream extends BaseTest {
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
 
   public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], properties, null);
-    server.getServerRpc().startServer();
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null, otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer =
+            new DataStreamServerImpl(peers.get(i), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }

Review comment:
       has not been addressed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r501249518



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       I am little confused here: this is an implementation that forwards messages to peers from a server, and then wait the `writes` on peers to finish, then replies to complete the current write (also a future for client, thus reply will complete that client's future). 
   
    How is it related to read?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507009782



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       I have to `allOf` to achieve the idea that make sure both local writes and writes to peers succeed before sending a reply.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#issuecomment-711109445


   R @szetszwo @runzhiwang 
   
   Have rebased this PR and addressed some of the important comments. The goal is to make this PR functional correct first. Can you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507482960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
        why add header future 2 times ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502093573



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -140,6 +160,14 @@ ChannelFuture buildChannel() {
         .bind();
   }
 
+  public void setupClient(List<RaftPeer> otherPeers, RaftProperties properties) {

Review comment:
       public -> private

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one, also weird.

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       I think this test does not cover "forward the data to the other servers".  If you comment the following code, this test still can pass.
   ```
         // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputs) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ```

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       I think we need. @szetszwo What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       I think we can do `channel.write(buffer);` and `streamOutput.streamAsync(buf.nioBuffer());` in parallel, `channel.write(buffer)`  maybe slow when write file. But not sure, just a discussion.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       I think we can do `channel.write(buffer);` and `streamOutput.streamAsync(buf.nioBuffer());` in parallel, `channel.write(buffer)`  maybe slow when write file in ozone. But not sure, just a discussion.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one, also weird.
   In my understanding, each request has a unique streamId.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java. I think we can change by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java. I think we can change by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (DataStreamClientImpl client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to [DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java). I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (DataStreamClientImpl client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to [DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java). I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to [DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java). I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId. I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507487483



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -58,55 +63,63 @@
 
   private final StateMachine stateMachine;
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
+  private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
+
+  private List<DataStreamClient> clients = new ArrayList<>();
 
   public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
     this.stateMachine = stateMachine;
     this.channelFuture = buildChannel();
   }
 
-  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers,
+      StateMachine stateMachine, RaftProperties properties){
+    this(server, stateMachine);
+    setupClient(otherPeers, properties);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() {
+    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf) {
     try {
       final RaftClientRequest request =
           ClientProtoUtils.toRaftClientRequest(RaftProtos.RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       return stateMachine.data().stream(request);
     } catch (InvalidProtocolBufferException e) {
       throw new CompletionException(e);
-    } finally {
-      buf.release();
-      released.set(true);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream, boolean released) {
-    if (released) {
+  private long writeTo(ByteBuf buf, DataStream stream) {
+    if (stream == null) {
       return 0;
     }
-    try {
-      if (stream == null) {
-        return 0;
-      }
 
-      final WritableByteChannel channel = stream.getWritableByteChannel();
-      long byteWritten = 0;
-      for (ByteBuffer buffer : buf.nioBuffers()) {
-        try {
-          byteWritten += channel.write(buffer);
-        } catch (Throwable t) {
-          throw new CompletionException(t);
-        }
+    final WritableByteChannel channel = stream.getWritableByteChannel();
+    long byteWritten = 0;
+    for (ByteBuffer buffer : buf.nioBuffers()) {
+      try {
+        byteWritten += channel.write(buffer);
+      } catch (Throwable t) {
+        throw new CompletionException(t);
       }
-      return byteWritten;
-    } finally {
-      buf.release();
     }
+    return byteWritten;
   }
 
-  private void sendReply(DataStreamRequestByteBuf request, long byteWritten, ChannelHandlerContext ctx) {
+  private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf) {
     // TODO RATIS-1098: include byteWritten and isSuccess in the reply
-    final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
-        request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
-    ctx.writeAndFlush(reply);
+    try {
+      final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
+          request.getStreamId(), request.getStreamOffset(), ByteBuffer.wrap("OK".getBytes()));
+      ctx.writeAndFlush(reply);
+    } finally {
+      buf.release();

Review comment:
       why move buf.release() from writeTo method to sendReply ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507017344



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerRpc.java
##########
@@ -27,6 +27,11 @@
    */
   void startServer();
 
+  /**
+   * start clients that used to forward requests to peers.
+   */
+  void startClientToPeers();

Review comment:
       The problem is when `startServer`, as other servers might not ready, we cannot guarantee that we can connect to peer. 
   
   If you still want to hide this `startClientToPeers`, then maybe we should do some lazy connection that when receives a first request, connects with other servers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId, and it's similar to [DataStreamClientImpl](https://github.com/apache/incubator-ratis/blob/master/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java). I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ...
   streamMap.computeIfAbsent(streamId, new ArrayList<>());
   for (DataStreamClientImpl client : clients) {
     streamMap.get(streamId).add(client.stream());
   }
   streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang merged pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang merged pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502097596



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();

Review comment:
       @szetszwo Could you help explain this ? If with the implementation of this patch, streamId always be one,  it's weird.
   In my understanding, each request has a unique streamId. I think we can change the patch by following code:
   `private Map<Long, List<DataStreamOutput>> streamMap = new ...;`
   
   ```
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
       ...
       streamMap.computeIfAbsent(streamId, new ArrayList<>());
       for (DataStreamClientImpl client : clients) {
          streamMap.get(streamId).add(client.stream());
       }
       streams.computeIfAbsent(streamId, id -> getDataStreamFuture(buf, released))
               .thenAccept(stream -> writeTo(buf, streamMap.get(streamId), stream, released.get()))
               .thenAccept(dummy -> sendReply(req, ctx));
   }
   ```
   ```
   private void writeTo(ByteBuf buf, List<DataStreamOutput> streamOutputList,  DataStream stream, boolean released) {
   ...
       // forward requests to other stream servers.
         for (DataStreamOutput streamOutput : streamOutputList) {
           streamOutput.streamAsync(buf.nioBuffer());
         }
   ...
   }
   ```
   
   
   And one more question, when stream between server, should we use the streamId got from client->server ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507482960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
        why add header future 2 times ? We do not forward header to the other 2 servers, we only need to add the header future of current server.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#issuecomment-705136295


   Now as RATIS-1081 is merged, I have rebased and start to address comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to other server in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r499123983



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       At this moment, I am not sure how this server should handle exceptions when forwarding message to peers:
   1. Will there any exception be thrown? I have this question because existing request handling code seems does not throw any exception already.
   2. If this server fails to forward a message to a peer, what is the proper way to respond to client? Note that the client is sending a stream of message. So is failures happen in the middle means the client should retry from the failure point?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507482960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -115,10 +128,29 @@ private ChannelInboundHandler getServerHandler(){
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
-        final AtomicBoolean released = new AtomicBoolean();
-        streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf, released))
-            .thenApply(stream -> writeTo(buf, stream, released.get()))
-            .thenAccept(byteWritten -> sendReply(request, byteWritten, ctx));
+        final boolean isHeader = request.getStreamOffset() == -1;
+
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+
+        final CompletableFuture<?> localWrites = isHeader?
+                streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
+                : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
+        parallelWrites[0] = localWrites;
+        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
+
+          // do not need to forward header request
+        if (isHeader) {
+          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
+            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          }

Review comment:
        why add header future 2 times when there are 2 clients ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r502204193



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -100,34 +101,68 @@ public WritableByteChannel getWritableByteChannel() {
     }
   }
 
-  private RaftPeer[] peers;
+  private List<RaftPeer> peers;
   private RaftProperties properties;
-  private DataStreamServerImpl server;
+  private List<DataStreamServerImpl> servers;
   private DataStreamClientImpl client;
   private int byteWritten = 0;
 
-  public void setupServer(){
-    server = new DataStreamServerImpl(peers[0], new SingleDataStreamStateMachine(), properties, null);
-    server.getServerRpc().startServer();
+  private void setupServer(){
+    servers = new ArrayList<>(peers.size());
+    // start stream servers on raft peers.
+    for (int i = 0; i < peers.size(); i++) {
+      if (i == 0) {
+        // only the first server routes requests to peers.
+        List<RaftPeer> otherPeers = new ArrayList<>(peers);
+        otherPeers.remove(peers.get(i));
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), properties, null, new SingleDataStreamStateMachine(), otherPeers);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      } else {
+        DataStreamServerImpl streamServer = new DataStreamServerImpl(
+            peers.get(i), new SingleDataStreamStateMachine(), properties, null);
+        servers.add(streamServer);
+        streamServer.getServerRpc().startServer();
+      }
+    }
+
+    // start peer clients on stream servers
+    for (DataStreamServerImpl streamServer : servers) {
+      streamServer.getServerRpc().startClientToPeers();
+    }
   }
 
-  public void setupClient(){
-    client = new DataStreamClientImpl(peers[0], properties, null);
+  private void setupClient(){
+    client = new DataStreamClientImpl(peers.get(0), properties, null);
     client.start();
   }
 
   public void shutDownSetup(){
     client.close();
-    server.close();
+    servers.stream().forEach(s -> s.close());
   }
 
   @Test
   public void testDataStream(){
     properties = new RaftProperties();
     peers = Arrays.stream(MiniRaftCluster.generateIds(1, 0))
                        .map(RaftPeerId::valueOf)
-                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
-                       .toArray(RaftPeer[]::new);
+                       .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress())).collect(
+            Collectors.toList());
+
+    setupServer();
+    setupClient();
+    runTestDataStream();
+  }
+
+  @Test
+  public void testDataStreamMultipleServer(){

Review comment:
       yes. I was thinking whether we need a read API to verify that write is complete. I do need suggestion here: do we need a read API to have a better test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507456334



##########
File path: ratis-test/src/test/java/org/apache/ratis/netty/TestRaftAsyncWithNetty.java
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.RaftAsyncTests;
+
+public class TestRaftAsyncWithNetty extends RaftAsyncTests<MiniRaftClusterWithNetty>

Review comment:
       Yes, wrong file. Has removed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r507009645



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -91,6 +106,11 @@ private void writeTo(ByteBuf buf, DataStream stream, boolean released) {
           throw new CompletionException(t);
         }
       }
+

Review comment:
       Have changed to make all writes happen in parallel, and have a `allOf` to waits all writes finish before send reply.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #213: RATIS-1082. Netty stream server should forward the data to the other servers in the group

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #213:
URL: https://github.com/apache/incubator-ratis/pull/213#discussion_r503646850



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -51,27 +57,52 @@
   private RandomAccessFile stream;
   private FileChannel fileChannel;
   private File file = new File("client-data-stream");
+  private List<DataStreamClientImpl> clients = new ArrayList<>();
+  private List<DataStreamOutput> streams = new ArrayList<>();
 
 
   public NettyServerStreamRpc(RaftPeer server){
     this.raftServer = server;
     setupServer();
   }
 
+  public NettyServerStreamRpc(
+      RaftPeer server, List<RaftPeer> otherPeers, RaftProperties properties){
+    this.raftServer = server;
+    setupServer();
+    setupClient(otherPeers, properties);
+  }
+
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         final DataStreamRequestByteBuf req = (DataStreamRequestByteBuf)msg;
+
+        List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+        // forward requests to other stream servers.
+        for (DataStreamOutput streamOutput : streams) {
+          CompletableFuture<DataStreamReply> future =
+              streamOutput.streamAsync(req.getBuf().nioBuffer());
+          futures.add(future);
+        }
+
         ByteBuffer[] bfs = req.getBuf().nioBuffers();
         for(int i = 0; i < bfs.length; i++){
           fileChannel.write(bfs[i]);
         }
-        req.getBuf().release();
-        final DataStreamReply reply = new DataStreamReplyByteBuffer(req.getStreamId(),
-                                                        req.getDataOffset(),
-                                                        ByteBuffer.wrap("OK".getBytes()));
-        ctx.writeAndFlush(reply);
+
+        try {
+          for (CompletableFuture<DataStreamReply> future : futures) {
+              future.join();

Review comment:
       Thanks @runzhiwang 
   
   Let me think about this and make changes accordingly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org