You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/04 00:26:07 UTC

[GitHub] [incubator-ratis] runzhiwang opened a new pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

runzhiwang opened a new pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250


   ## What changes were proposed in this pull request?
   
   Use thenApplyAsync instead of thenApply
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-1122
   
   ## How was this patch tested?
   
   No need to add new ut.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r517275735



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()

Review comment:
       > ...  out.writeAsync was called by the same thread.
   
   We should not assume "the same thread" in async programming.   Anyway, let's merge this first.  We may work on it later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r517276483



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()

Review comment:
       Agree, let me change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r517234760



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()

Review comment:
       Let me test it a little bit more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515788059



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       We should have a single executor for all the streams.  Otherwise, it requires a thread per stream.  Since threads are limited resource, the number of streams will be limited by the threads.  Also, creating more threads is not useful since io is bounded by the hardware (e.g. # of discs).
   
   HDFS Datanode has the problem that it needs a thread per pipeline so that the number of pipeline is limited by it.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       After RATIS-1126, we may simply use. NettyServerStreamRpc.executorService.  Or, we may add another 
   ```
   private final ExecutorService streamExecutor;
   ```
   for the stream write() and close().

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -126,11 +127,14 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService executorService;

Review comment:
       We don't have to add executorService in StreamInfo.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       > Because one stream must use the same thread to write, ...
   
   No.  We should not require using the same thread.  This is not the async model.
   
   Think about this: for any moment of time, our program must guarantee only one thread calling channel.write(buffer).  Therefore, the program must be synchronise somewhere.  So, we may safely use a thread-pool (with multiple threads).

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       > Because one stream must use the same thread to write, ...
   
   No.  We should not require using the same thread.  This is not the async model.
   
   Think about this: for any moment of time, our program must guarantee only one thread calling channel.write(buffer).  Therefore, the program must be synchronised somewhere.  So, we may safely use a thread-pool (with multiple threads).

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       > ... I am not sure one thread is enough for all streams. ...
   
   I mean a single executor with multiple threads.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -224,8 +232,26 @@ public NettyServerStreamRpc(RaftServer server) {
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .bind(port);
     this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
-    this.executorService = Executors.newFixedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
+
+    int threadPoolSize = RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties());
+    for (int i = 0; i < threadPoolSize; i ++) {
+      this.streamExecutors.add(Executors.newFixedThreadPool(1));
+    }
+  }
+
+  private synchronized ExecutorService getMinTaskExecutor() {
+    ExecutorService minTaskExecutor = null;
+    int minTaskNum = Integer.MAX_VALUE;
+
+    for (ExecutorService e : streamExecutors) {
+      ThreadPoolExecutor tpe = (ThreadPoolExecutor) e;
+      if (minTaskNum > tpe.getQueue().size()) {
+        minTaskNum = tpe.getQueue().size();
+        minTaskExecutor = e;
+      }
+    }
+
+    return minTaskExecutor;
   }

Review comment:
       We should just use the executor service but not try to do thread scheduling ourselves.  The existing ExecutorService is good enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r517237954



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()

Review comment:
       Good catch.  The write should also wait for the previous.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()
+          .thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf, stream), executorService);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));

Review comment:
       We should do the same for the remote writes.
   ```
         final CompletableFuture<?> previous = info.getPrevious().get();
         localWrite = previous.thenComposeAsync(v -> info.getStream())
             .thenApplyAsync(stream -> writeTo(buf, stream), executorService);
         for (DataStreamOutput out : info.getDataStreamOutputs()) {
           remoteWrites.add(previous.thenComposeAsync(v -> out.writeAsync(request.slice().nioBuffer()), executorService));
         }
   ```

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()
+          .thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf, stream), executorService);

Review comment:
       We should use thenComposeAsync; see the comment below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r517276483



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()

Review comment:
       Agree, let me change it in RATIS-1083.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #250: Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#issuecomment-720286069


   @szetszwo Could you help review this ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515811727



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream. What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream.  Because one stream must write use the same thread, so I can not create a ExecutorService with 4 threads, and all the streams share the same ExecutorService. the What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream.  Because one stream must use the same thread to write, so I can not create a ExecutorService with 4 threads, and all the streams share the same ExecutorService. the What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream.  Because one stream must use the same thread to write, so I can not create a ExecutorService with 4 threads, and all the streams share the same ExecutorService.  What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .  So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4 ExecutorService with the fewest tasks to the new stream, then each stream use the same thread to write, and no thread need to wait for lock.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  
   
   1. Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .  
   
   2. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4 ExecutorService with the fewest tasks to the new stream, then each stream use the same thread to write, and no thread need to wait for lock. 
   
   3. I remove NettyServerStreamRpc.executorService, and use the stream's assigned ExecutorService.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  
   
   1. Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream at the same time, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .  
   
   2. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4 ExecutorService with the fewest tasks to the new stream, then each stream use the same thread to write, and no thread need to wait for lock. 
   
   3. I remove NettyServerStreamRpc.executorService, and use the stream's assigned ExecutorService.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       >  for any moment of time, our program must guarantee only one thread calling channel.write(buffer)
   
   @szetszwo Current implementation can not guarantee only one thread calling channel.write(buffer) at the same time. 

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       >  for any moment of time, our program must guarantee only one thread calling channel.write(buffer)
   
   @szetszwo Current implementation can not guarantee only one thread calling channel.write(buffer) at the same time. For example, If we only change `localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));` to `localWrite = info.getStream().thenApplyAsync(stream -> writeTo(buf, stream), executorService);`. we can not pass the UT.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo Could you help review this comment ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r517269238



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()

Review comment:
       @szetszwo remote writes will not go out of order without wait for previous. Because request was send by OrderedStreamAsync, and out.writeAsync was called by the same thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r517230295



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -318,21 +318,21 @@ private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()

Review comment:
       It seems that only the local write will wait for previous but the remote writes won't.  Then, the remote writes may go out of order.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#issuecomment-721582593


   @szetszwo I update the patch use your suggestions. Could you help review it again ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515941732



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       >  for any moment of time, our program must guarantee only one thread calling channel.write(buffer)
   
   @szetszwo Current implementation can not guarantee only one thread calling channel.write(buffer) at the same time. For example, If we only change `localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));` to `localWrite = info.getStream().thenApplyAsync(stream -> writeTo(buf, stream), executorService);`. we can not pass the UT.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo Could you help review this comment ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org