You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/02/06 11:29:16 UTC

[ignite-3] branch ignite-13885 updated: IGNITE-13885 Fixed default execution.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch ignite-13885
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-13885 by this push:
     new 1c57431  IGNITE-13885 Fixed default execution.
1c57431 is described below

commit 1c57431f24d95a00394f1199064448f85d601bb6
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Sat Feb 6 14:29:05 2021 +0300

    IGNITE-13885 Fixed default execution.
---
 .../com/alipay/sofa/jraft/RaftGroupService.java    |  2 +-
 .../java/com/alipay/sofa/jraft/core/NodeImpl.java  |  1 +
 .../alipay/sofa/jraft/rpc/impl/LocalRpcServer.java | 65 +++++++++++++++++-----
 .../impl/core/AppendEntriesRequestProcessor.java   |  4 +-
 .../impl/core/InstallSnapshotRequestProcessor.java |  1 +
 5 files changed, 57 insertions(+), 16 deletions(-)

diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
index c41414d..a451ed3 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/RaftGroupService.java
@@ -159,7 +159,7 @@ public class RaftGroupService {
                     this.rpcServer.shutdown();
                 }
             } catch (final Exception ignored) {
-                // ignore
+                // ignore TODO asch not good to ignore ?
             }
             this.rpcServer = null;
         }
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
index dedc2aa..789122e 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
@@ -2121,6 +2121,7 @@ public class NodeImpl implements Node, RaftServerService {
         }
     }
 
+    // TODO asch TBD how follower notifies about catchup.
     private void onCaughtUp(final PeerId peer, final long term, final long version, final Status st) {
         this.writeLock.lock();
         try {
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
index c3d17b9..a24427d 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/LocalRpcServer.java
@@ -22,6 +22,7 @@ import com.alipay.sofa.jraft.rpc.RpcContext;
 import com.alipay.sofa.jraft.rpc.RpcProcessor;
 import com.alipay.sofa.jraft.rpc.RpcServer;
 import com.alipay.sofa.jraft.util.Endpoint;
+import com.alipay.sofa.jraft.util.NamedThreadFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -29,8 +30,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Local RPC server impl.
@@ -38,6 +45,8 @@ import java.util.function.Consumer;
  * @author ascherbakov.
  */
 public class LocalRpcServer implements RpcServer {
+    private static final Logger LOG                    = LoggerFactory.getLogger(LocalRpcServer.class);
+
     /** Running servers. */
     public static ConcurrentMap<Endpoint, LocalRpcServer> servers = new ConcurrentHashMap<>();
 
@@ -54,7 +63,9 @@ public class LocalRpcServer implements RpcServer {
 
     private List<ConnectionClosedEventListener> listeners = new CopyOnWriteArrayList<>();
 
-    BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch use some kind of MPSC queue.
+    BlockingQueue<Object[]> incoming = new LinkedBlockingDeque<>(); // TODO asch OOM is possible, handle that.
+
+    private ExecutorService defaultExecutor;
 
     public LocalRpcServer(Endpoint local) {
         this.local = local;
@@ -142,20 +153,34 @@ public class LocalRpcServer implements RpcServer {
                             }
                         }
 
-                        prc.handleRequest(new RpcContext() {
-                            @Override public void sendResponse(Object responseObj) {
-                                fut.complete(responseObj);
-                            }
+                        RpcProcessor.ExecutorSelector selector = prc.executorSelector();
 
-                            @Override public Connection getConnection() {
-                                return conn;
-                            }
+                        Executor executor = null;
 
-                            @Override public String getRemoteAddress() {
-                                return sender.toString();
-                            }
-                        }, msg);
+                        if (selector != null) {
+                            executor = selector.select(null, msg);
+                        }
+
+                        if (executor == null)
+                            executor = defaultExecutor;
+
+                        RpcProcessor finalPrc = prc;
+
+                        executor.execute(() -> {
+                            finalPrc.handleRequest(new RpcContext() {
+                                @Override public void sendResponse(Object responseObj) {
+                                    fut.complete(responseObj);
+                                }
+
+                                @Override public Connection getConnection() {
+                                    return conn;
+                                }
 
+                                @Override public String getRemoteAddress() {
+                                    return sender.toString();
+                                }
+                            }, msg);
+                        });
                     } catch (InterruptedException e) {
                         return;
                     }
@@ -163,7 +188,9 @@ public class LocalRpcServer implements RpcServer {
             }
         });
 
-        worker.setName("LocalRPCServer-Thread: "  + local.toString());
+        defaultExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("LocalRPCServer-Default-Executor-Thread: " + local.toString()));
+
+        worker.setName("LocalRPCServer-Dispatch-Thread: "  + local.toString());
         worker.start();
 
         servers.put(local, this);
@@ -178,6 +205,7 @@ public class LocalRpcServer implements RpcServer {
             return;
 
         started = false;
+
         worker.interrupt();
         try {
             worker.join();
@@ -185,6 +213,17 @@ public class LocalRpcServer implements RpcServer {
             throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
         }
 
+        defaultExecutor.shutdownNow();
+
+        try {
+            boolean stopped = defaultExecutor.awaitTermination(60_000, TimeUnit.MILLISECONDS);
+
+            if (!stopped) // TODO asch make thread dump.
+                LOG.error("Failed to wait for graceful executor shutdown, probably some task is hanging.");
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for RPC server to stop " + local);
+        }
+
         // Close all connections to this server.
         for (LocalRpcClient client : conns.keySet())
             closeConnection(client, local);
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
index 811bc2a..8b45827 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/AppendEntriesRequestProcessor.java
@@ -66,9 +66,9 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn
             super();
         }
 
-        @Override
+        @Override // TODO asch should be select(Message msg)
         public Executor select(final String reqClass, final Object reqHeader) {
-            final AppendEntriesRequestHeader header = (AppendEntriesRequestHeader) reqHeader;
+            final AppendEntriesRequest header = (AppendEntriesRequest) reqHeader;
             final String groupId = header.getGroupId();
             final String peerId = header.getPeerId();
             final String serverId = header.getServerId();
diff --git a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java
index d6186af..10f15b3 100644
--- a/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java
+++ b/modules/raft/src/main/java/com/alipay/sofa/jraft/rpc/impl/core/InstallSnapshotRequestProcessor.java
@@ -30,6 +30,7 @@ import com.alipay.sofa.jraft.rpc.Message;
  * @author boyan (boyan@alibaba-inc.com)
  *
  * 2018-Apr-08 6:09:34 PM
+ * TODO asch use dedicated executor for potentially long jobs ?
  */
 public class InstallSnapshotRequestProcessor extends NodeRequestProcessor<InstallSnapshotRequest> {