You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ad...@apache.org on 2022/08/07 06:28:13 UTC

[ratis] branch master updated: RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bbb4401b RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)
9bbb4401b is described below

commit 9bbb4401b832a69035bf0b186bb9525bf6aadeb9
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Sun Aug 7 08:28:09 2022 +0200

    RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)
---
 .../src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++-
 .../main/java/org/apache/ratis/server/impl/RaftServerProxy.java    | 4 +++-
 .../java/org/apache/ratis/InstallSnapshotNotificationTests.java    | 7 ++++++-
 3 files changed, 11 insertions(+), 3 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fe9f87ce9..6ab0a6a5c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1414,7 +1414,8 @@ class RaftServerImpl implements RaftServer.Division,
       }
     }
     return JavaUtils.allOf(futures).whenCompleteAsync(
-        (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
+        (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)),
+        serverExecutor
     ).thenApply(v -> {
       final AppendEntriesReplyProto reply;
       synchronized(this) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 96f7efbe1..ad4d988ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -115,7 +115,9 @@ class RaftServerProxy implements RaftServer {
         return;
       }
       isClosed = true;
-      map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(), entry.getValue()));
+      ConcurrentUtils.parallelForEachAsync(map.entrySet(),
+          entry -> close(entry.getKey(), entry.getValue()),
+          executor);
     }
 
     private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 4476f3ecf..215e8408f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -51,6 +51,8 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -85,6 +87,9 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
   private static final AtomicInteger numNotifyInstallSnapshotFinished = new AtomicInteger();
 
   private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing {
+
+    private final Executor stateMachineExecutor = Executors.newSingleThreadExecutor();
+
     @Override
     public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
         RaftProtos.RoleInfoProto roleInfoProto,
@@ -120,7 +125,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
         return leaderSnapshotInfo.getTermIndex();
       };
 
-      return CompletableFuture.supplyAsync(supplier);
+      return CompletableFuture.supplyAsync(supplier, stateMachineExecutor);
     }
 
     @Override