You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/10/01 13:00:36 UTC

[GitHub] [incubator-ratis] szetszwo opened a new pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

szetszwo opened a new pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212


   See https://issues.apache.org/jira/browse/RATIS-1081


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#issuecomment-702272093


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#issuecomment-704900136


   @runzhiwang and @amaliujia thanks for reviewing this.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#discussion_r498392458



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -33,41 +37,76 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
 
-  private RaftPeer raftServer;
-  private EventLoopGroup bossGroup = new NioEventLoopGroup();
-  private EventLoopGroup workerGroup = new NioEventLoopGroup();
-  private ChannelFuture channelFuture;
-  private RandomAccessFile stream;
-  private FileChannel fileChannel;
-  private File file = new File("client-data-stream");
+  private final RaftPeer raftServer;
+  private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private final ChannelFuture channelFuture;
 
+  private final StateMachine stateMachine;
+  private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
 
-  public NettyServerStreamRpc(RaftPeer server){
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
-    setupServer();
+    this.stateMachine = stateMachine;
+    this.channelFuture = buildChannel();
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+    try {
+      // TODO: read the request from buf

Review comment:
       Nit: may create a Jira to track this TODO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#discussion_r498574780



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -33,41 +37,76 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
 
-  private RaftPeer raftServer;
-  private EventLoopGroup bossGroup = new NioEventLoopGroup();
-  private EventLoopGroup workerGroup = new NioEventLoopGroup();
-  private ChannelFuture channelFuture;
-  private RandomAccessFile stream;
-  private FileChannel fileChannel;
-  private File file = new File("client-data-stream");
+  private final RaftPeer raftServer;
+  private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private final ChannelFuture channelFuture;
 
+  private final StateMachine stateMachine;
+  private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
 
-  public NettyServerStreamRpc(RaftPeer server){
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
     this.raftServer = server;
-    setupServer();
+    this.stateMachine = stateMachine;
+    this.channelFuture = buildChannel();
+  }
+
+  private CompletableFuture<DataStream> getDataStreamFuture(ByteBuf buf, AtomicBoolean released) {
+    try {
+      // TODO: read the request from buf

Review comment:
       Thanks.  Just have filed RATIS-1085.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#discussion_r499399153



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -72,27 +129,47 @@ public void testDataStream(){
   }
 
   public void runTestDataStream(){
-    DataStreamOutput stream = client.stream();
-    ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
-    for (int i = 0; i < bf.capacity(); i++) {
-      bf.put((byte)'a');
-    }
-    bf.flip();
-    int i = 0;
+    final int bufferSize = 1024*1024;
+    final int bufferNum = 10;
+    final DataStreamOutput out = client.stream();
 
-    while(i < 1000){
-      bf.position(0).limit(bf.capacity());
-      futures.add(stream.streamAsync(bf));
-      i++;
+    final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    futures.add(sendRequest(out, 1024));
+
+    //send data
+    final int halfBufferSize = bufferSize/2;
+    int dataSize = 0;
+    for(int i = 0; i < bufferNum; i++) {
+      final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
+      final ByteBuffer bf = initBuffer(dataSize, size);
+      futures.add(out.streamAsync(bf));
+      dataSize += size;
     }
-    try {
-      Thread.sleep(1000*3);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+    for(int i = 0; i < futures.size(); i++){
+      futures.get(i).join();
     }
+    Assert.assertEquals(dataSize, byteWritten);
     shutDownSetup();
-    for(i = 0; i < futures.size(); i++){
-      Assert.assertTrue(futures.get(i).isDone());
+  }
+
+  CompletableFuture<DataStreamReply> sendRequest(DataStreamOutput out, int size) {
+    // TODO RATIS-1085: create a RaftClientRequest and put it in the buffer
+    final ByteBuffer buffer = initBuffer(0, size);
+    return out.streamAsync(buffer);
+  }
+
+  static ByteBuffer initBuffer(int offset, int size) {
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+    final int length = buffer.capacity();
+    buffer.position(0).limit(length);
+    final StringBuilder b = new StringBuilder();

Review comment:
       `b` seems useless.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#issuecomment-703524216


   LGTM. Just wait CI passed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang merged pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
runzhiwang merged pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#issuecomment-703964040


   @szetszwo Thanks the patch, I have merged it. The failed ut looks unrelated to this pr. @amaliujia Thanks for review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #212: RATIS-1081. Change NettyServerStreamRpc to write data to StateMachine.

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #212:
URL: https://github.com/apache/incubator-ratis/pull/212#discussion_r499432546



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStream.java
##########
@@ -72,27 +129,47 @@ public void testDataStream(){
   }
 
   public void runTestDataStream(){
-    DataStreamOutput stream = client.stream();
-    ByteBuffer bf = ByteBuffer.allocateDirect(1024*1024);
-    for (int i = 0; i < bf.capacity(); i++) {
-      bf.put((byte)'a');
-    }
-    bf.flip();
-    int i = 0;
+    final int bufferSize = 1024*1024;
+    final int bufferNum = 10;
+    final DataStreamOutput out = client.stream();
 
-    while(i < 1000){
-      bf.position(0).limit(bf.capacity());
-      futures.add(stream.streamAsync(bf));
-      i++;
+    final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    futures.add(sendRequest(out, 1024));
+
+    //send data
+    final int halfBufferSize = bufferSize/2;
+    int dataSize = 0;
+    for(int i = 0; i < bufferNum; i++) {
+      final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
+      final ByteBuffer bf = initBuffer(dataSize, size);
+      futures.add(out.streamAsync(bf));
+      dataSize += size;
     }
-    try {
-      Thread.sleep(1000*3);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
+    for(int i = 0; i < futures.size(); i++){
+      futures.get(i).join();
     }
+    Assert.assertEquals(dataSize, byteWritten);
     shutDownSetup();
-    for(i = 0; i < futures.size(); i++){
-      Assert.assertTrue(futures.get(i).isDone());
+  }
+
+  CompletableFuture<DataStreamReply> sendRequest(DataStreamOutput out, int size) {
+    // TODO RATIS-1085: create a RaftClientRequest and put it in the buffer
+    final ByteBuffer buffer = initBuffer(0, size);
+    return out.streamAsync(buffer);
+  }
+
+  static ByteBuffer initBuffer(int offset, int size) {
+    final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+    final int length = buffer.capacity();
+    buffer.position(0).limit(length);
+    final StringBuilder b = new StringBuilder();

Review comment:
       Yes, it is useless.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org