You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:01:22 UTC

[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

runzhiwang commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515811727



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream. What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream.  Because one stream must write use the same thread, so I can not create a ExecutorService with 4 threads, and all the streams share the same ExecutorService. the What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream.  Because one stream must use the same thread to write, so I can not create a ExecutorService with 4 threads, and all the streams share the same ExecutorService. the What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       @szetszwo Thanks the suggestions. I agree. Besides, I am not sure one thread is enough for all streams. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4  ExecutorService with the fewest tasks to the new stream.  Because one stream must use the same thread to write, so I can not create a ExecutorService with 4 threads, and all the streams share the same ExecutorService.  What do you think ?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .  So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4 ExecutorService with the fewest tasks to the new stream, then each stream use the same thread to write, and no thread need to wait for lock.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  
   
   1. Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .  
   
   2. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4 ExecutorService with the fewest tasks to the new stream, then each stream use the same thread to write, and no thread need to wait for lock. 
   
   3. I remove NettyServerStreamRpc.executorService, and use the stream's assigned ExecutorService.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo It will cause error.  
   
   1. Because one stream must use the same thread to write, if we use NettyServerStreamRpc.executorService, it will happen multi-thread call the write belongs to the same stream at the same time, error will happen. Then we must add synchronized to write method, which is not appropriate, because it will cause thread wait for lock .  
   
   2. So I create 4 ExecutorService, each has a single thread in the pool, and assign one of the 4 ExecutorService with the fewest tasks to the new stream, then each stream use the same thread to write, and no thread need to wait for lock. 
   
   3. I remove NettyServerStreamRpc.executorService, and use the stream's assigned ExecutorService.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       >  for any moment of time, our program must guarantee only one thread calling channel.write(buffer)
   
   @szetszwo Current implementation can not guarantee only one thread calling channel.write(buffer) at the same time. 

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       >  for any moment of time, our program must guarantee only one thread calling channel.write(buffer)
   
   @szetszwo Current implementation can not guarantee only one thread calling channel.write(buffer) at the same time. For example, If we only change `localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));` to `localWrite = info.getStream().thenApplyAsync(stream -> writeTo(buf, stream), executorService);`. we can not pass the UT.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       @szetszwo Could you help review this comment ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org