You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:07:42 UTC

[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #250: RATIS-1122. Use thenApplyAsync instead of thenApply

szetszwo commented on a change in pull request #250:
URL: https://github.com/apache/incubator-ratis/pull/250#discussion_r515788059



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       We should have a single executor for all the streams.  Otherwise, it requires a thread per stream.  Since threads are limited resource, the number of streams will be limited by the threads.  Also, creating more threads is not useful since io is bounded by the hardware (e.g. # of discs).
   
   HDFS Datanode has the problem that it needs a thread per pipeline so that the number of pipeline is limited by it.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       After RATIS-1126, we may simply use. NettyServerStreamRpc.executorService.  Or, we may add another 
   ```
   private final ExecutorService streamExecutor;
   ```
   for the stream write() and close().

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -126,11 +127,14 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService executorService;

Review comment:
       We don't have to add executorService in StreamInfo.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       > Because one stream must use the same thread to write, ...
   
   No.  We should not require using the same thread.  This is not the async model.
   
   Think about this: for any moment of time, our program must guarantee only one thread calling channel.write(buffer).  Therefore, the program must be synchronise somewhere.  So, we may safely use a thread-pool (with multiple threads).

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -210,6 +218,8 @@ StreamInfo get(Key key) {
 
   private final ExecutorService executorService;
 
+  private final List<ExecutorService> streamExecutors = new ArrayList<>();
+

Review comment:
       > Because one stream must use the same thread to write, ...
   
   No.  We should not require using the same thread.  This is not the async model.
   
   Think about this: for any moment of time, our program must guarantee only one thread calling channel.write(buffer).  Therefore, the program must be synchronised somewhere.  So, we may safely use a thread-pool (with multiple threads).

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -123,11 +125,13 @@ void close() {
     private final List<DataStreamOutput> outs;
     private final AtomicReference<CompletableFuture<?>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
+    private final ExecutorService writeExecutor;

Review comment:
       > ... I am not sure one thread is enough for all streams. ...
   
   I mean a single executor with multiple threads.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -224,8 +232,26 @@ public NettyServerStreamRpc(RaftServer server) {
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .bind(port);
     this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
-    this.executorService = Executors.newFixedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
+
+    int threadPoolSize = RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties());
+    for (int i = 0; i < threadPoolSize; i ++) {
+      this.streamExecutors.add(Executors.newFixedThreadPool(1));
+    }
+  }
+
+  private synchronized ExecutorService getMinTaskExecutor() {
+    ExecutorService minTaskExecutor = null;
+    int minTaskNum = Integer.MAX_VALUE;
+
+    for (ExecutorService e : streamExecutors) {
+      ThreadPoolExecutor tpe = (ThreadPoolExecutor) e;
+      if (minTaskNum > tpe.getQueue().size()) {
+        minTaskNum = tpe.getQueue().size();
+        minTaskExecutor = e;
+      }
+    }
+
+    return minTaskExecutor;
   }

Review comment:
       We should just use the executor service but not try to do thread scheduling ourselves.  The existing ExecutorService is good enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org