You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ar...@apache.org on 2019/03/21 16:46:38 UTC
[incubator-ratis] branch master updated: RATIS-507. RaftServerProxy
should not use common thread pool for creating raft server. Contributed by
Mukul Kumar Singh.
This is an automated email from the ASF dual-hosted git repository.
arp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1fc5ace RATIS-507. RaftServerProxy should not use common thread pool for creating raft server. Contributed by Mukul Kumar Singh.
1fc5ace is described below
commit 1fc5acedda4dbd7c62fe73c8656c792871f4661f
Author: Arpit Agarwal <ar...@apache.org>
AuthorDate: Thu Mar 21 09:46:19 2019 -0700
RATIS-507. RaftServerProxy should not use common thread pool for creating raft server. Contributed by Mukul Kumar Singh.
---
.../apache/ratis/server/impl/RaftServerProxy.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 6353902..4a0f04c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -55,10 +55,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -160,6 +157,8 @@ public class RaftServerProxy implements RaftServer {
private final RaftServerRpc serverRpc;
private final ServerFactory factory;
+ private ExecutorService implExecutor;
+
private final ImplMap impls = new ImplMap();
RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
@@ -173,6 +172,8 @@ public class RaftServerProxy implements RaftServer {
this.serverRpc = factory.newRaftServerRpc(this);
this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
+
+ this.implExecutor = Executors.newSingleThreadExecutor();
}
/** Check the storage dir and add groups*/
@@ -208,7 +209,7 @@ public class RaftServerProxy implements RaftServer {
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
}
- });
+ }, implExecutor);
}
private static String getIdStringFrom(RaftServerRpc rpc) {
@@ -302,6 +303,13 @@ public class RaftServerProxy implements RaftServer {
@Override
public void close() {
+ try {
+ implExecutor.shutdown();
+ implExecutor.awaitTermination(1, TimeUnit.DAYS);
+ } catch (Exception e) {
+ LOG.warn(getId() + ": Failed to shutdown " + getRpcType() + " server");
+ }
+
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: close", getId());
impls.close();
@@ -373,7 +381,7 @@ public class RaftServerProxy implements RaftServer {
final boolean started = newImpl.start();
Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl);
return new RaftClientReply(request, newImpl.getCommitInfos());
- })
+ }, implExecutor)
.whenComplete((_1, throwable) -> {
if (throwable != null) {
impls.remove(newGroup.getGroupId());