You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/06 11:29:16 UTC
[ignite-3] branch ignite-13885 updated: IGNITE-13885 Fixed default
execution.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-13885 by this push:
new 1c57431 IGNITE-13885 Fixed default execution.
1c57431 is described below
commit 1c57431f24d95a00394f1199064448f85d601bb6
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Sat Feb 6 14:29:05 2021 +0300
IGNITE-13885 Fixed default execution.
---
.../com/alipay/sofa/jraft/RaftGroupService.java | 2 +-
.../java/com/alipay/sofa/jraft/core/NodeImpl.java | 1 +
.../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 65 +++++++++++++++++-----
.../impl/core/AppendEntriesRequestProcessor.java | 4 +-
.../impl/core/InstallSnapshotRequestProcessor.java | 1 +
5 files changed, 57 insertions(+), 16 deletions(-)
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
index c41414d..a451ed3 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
@@ -159,7 +159,7 @@ public class RaftGroupService {
this.rpcServer.shutdown();
}
} catch (final Exception ignored) {
- // ignore
+ // ignore TODO asch not good to ignore ?
}
this.rpcServer = null;
}
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
index dedc2aa..789122e 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
@@ -2121,6 +2121,7 @@ public class NodeImpl implements Node, RaftServerService {
}
}
+ // TODO asch TBD how follower notifies about catchup.
private void onCaughtUp(final PeerId peer, final long term, final long version, final Status st) {
this.writeLock.lock();
try {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index c3d17b9..a24427d 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.Endpoint;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -29,8 +30,14 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Local RPC server impl.
@@ -38,6 +45,8 @@ import java.util.function.Consumer;
* @author ascherbakov.
*/
public class LocalRpcServer implements RpcServer {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalRpcServer.class);
+
/** Running servers. */
public static ConcurrentMap<Endpoint, LocalRpcServer> servers = new ConcurrentHashMap<>();
@@ -54,7 +63,9 @@ public class LocalRpcServer implements RpcServer {
private List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
- BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch use some kind of MPSC queue.
+ BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch OOM is possible, handle that.
+
+ private ExecutorService defaultExecutor;
public LocalRpcServer(Endpoint local) {
this.local = local;
@@ -142,20 +153,34 @@ public class LocalRpcServer implements RpcServer {
}
}
- prc.handleRequest(new RpcContext() {
- @Override public void sendResponse(Object responseObj) {
- fut.complete(responseObj);
- }
+ RpcProcessor.ExecutorSelector selector = prc.executorSelector();
- @Override public Connection getConnection() {
- return conn;
- }
+ Executor executor = null;
- @Override public String getRemoteAddress() {
- return sender.toString();
- }
- }, msg);
+ if (selector != null) {
+ executor = selector.select(null, msg);
+ }
+
+ if (executor == null)
+ executor = defaultExecutor;
+
+ RpcProcessor finalPrc = prc;
+
+ executor.execute(() -> {
+ finalPrc.handleRequest(new RpcContext() {
+ @Override public void sendResponse(Object responseObj) {
+ fut.complete(responseObj);
+ }
+
+ @Override public Connection getConnection() {
+ return conn;
+ }
+ @Override public String getRemoteAddress() {
+ return sender.toString();
+ }
+ }, msg);
+ });
} catch (InterruptedException e) {
return;
}
@@ -163,7 +188,9 @@ public class LocalRpcServer implements RpcServer {
}
});
- worker.setName("LocalRPCServer-Thread: " + local.toString());
+ defaultExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("LocalRPCServer-Default-Executor-Thread: " + local.toString()));
+
+ worker.setName("LocalRPCServer-Dispatch-Thread: " + local.toString());
worker.start();
servers.put(local, this);
@@ -178,6 +205,7 @@ public class LocalRpcServer implements RpcServer {
return;
started = false;
+
worker.interrupt();
try {
worker.join();
@@ -185,6 +213,17 @@ public class LocalRpcServer implements RpcServer {
throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
}
+ defaultExecutor.shutdownNow();
+
+ try {
+ boolean stopped = defaultExecutor.awaitTermination(60_000, TimeUnit.MILLISECONDS);
+
+ if (!stopped) // TODO asch make thread dump.
+ LOG.error("Failed to wait for graceful executor shutdown, probably some task is hanging.");
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
+ }
+
// Close all connections to this server.
for (LocalRpcClient client : conns.keySet())
closeConnection(client, local);
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index 811bc2a..8b45827 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -66,9 +66,9 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
super();
}
- @Override
+ @Override // TODO asch should be select(Message msg)
public Executor select(final String reqClass, final Object reqHeader) {
- final AppendEntriesRequestHeader header = (AppendEntriesRequestHeader) reqHeader;
+ final AppendEntriesRequest header = (AppendEntriesRequest) reqHeader;
final String groupId = header.getGroupId();
final String peerId = header.getPeerId();
final String serverId = header.getServerId();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java
index d6186af..10f15b3 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java
@@ -30,6 +30,7 @@ import com.alipay.sofa.jraft.rpc.Message;
* @author boyan (boyan@alibaba-inc.com)
*
* 2018-Apr-08 6:09:34 PM
+ * TODO asch use dedicated executor for potentially long jobs ?
*/
public class InstallSnapshotRequestProcessor extends NodeRequestProcessor<InstallSnapshotRequest> {