You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ar...@apache.org on 2019/03/21 16:46:38 UTC

[incubator-ratis] branch master updated: RATIS-507. RaftServerProxy should not use common thread pool for creating raft server. Contributed by Mukul Kumar Singh.

This is an automated email from the ASF dual-hosted git repository.

arp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc5ace  RATIS-507. RaftServerProxy should not use common thread pool for creating raft server. Contributed by Mukul Kumar Singh.
1fc5ace is described below

commit 1fc5acedda4dbd7c62fe73c8656c792871f4661f
Author: Arpit Agarwal <ar...@apache.org>
AuthorDate: Thu Mar 21 09:46:19 2019 -0700

    RATIS-507. RaftServerProxy should not use common thread pool for creating raft server. Contributed by Mukul Kumar Singh.
---
 .../apache/ratis/server/impl/RaftServerProxy.java    | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 6353902..4a0f04c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -55,10 +55,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -160,6 +157,8 @@ public class RaftServerProxy implements RaftServer {
   private final RaftServerRpc serverRpc;
   private final ServerFactory factory;
 
+  private ExecutorService implExecutor;
+
   private final ImplMap impls = new ImplMap();
 
   RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
@@ -173,6 +172,8 @@ public class RaftServerProxy implements RaftServer {
     this.serverRpc = factory.newRaftServerRpc(this);
     this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
     this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
+
+    this.implExecutor = Executors.newSingleThreadExecutor();
   }
 
   /** Check the storage dir and add groups*/
@@ -208,7 +209,7 @@ public class RaftServerProxy implements RaftServer {
       } catch(IOException e) {
         throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
       }
-    });
+    }, implExecutor);
   }
 
   private static String getIdStringFrom(RaftServerRpc rpc) {
@@ -302,6 +303,13 @@ public class RaftServerProxy implements RaftServer {
 
   @Override
   public void close() {
+    try {
+      implExecutor.shutdown();
+      implExecutor.awaitTermination(1, TimeUnit.DAYS);
+    } catch (Exception e) {
+      LOG.warn(getId() + ": Failed to shutdown " + getRpcType() + " server");
+    }
+
     lifeCycle.checkStateAndClose(() -> {
       LOG.info("{}: close", getId());
       impls.close();
@@ -373,7 +381,7 @@ public class RaftServerProxy implements RaftServer {
           final boolean started = newImpl.start();
           Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl);
           return new RaftClientReply(request, newImpl.getCommitInfos());
-        })
+        }, implExecutor)
         .whenComplete((_1, throwable) -> {
           if (throwable != null) {
             impls.remove(newGroup.getGroupId());