You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ad...@apache.org on 2022/08/07 06:28:13 UTC
[ratis] branch master updated: RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9bbb4401b RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)
9bbb4401b is described below
commit 9bbb4401b832a69035bf0b186bb9525bf6aadeb9
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Sun Aug 7 08:28:09 2022 +0200
RATIS-1656. Leftover usage of ForkJoinPool.commonPool() in RaftServerImpl (#702)
---
.../src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++-
.../main/java/org/apache/ratis/server/impl/RaftServerProxy.java | 4 +++-
.../java/org/apache/ratis/InstallSnapshotNotificationTests.java | 7 ++++++-
3 files changed, 11 insertions(+), 3 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fe9f87ce9..6ab0a6a5c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1414,7 +1414,8 @@ class RaftServerImpl implements RaftServer.Division,
}
}
return JavaUtils.allOf(futures).whenCompleteAsync(
- (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE))
+ (r, t) -> followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)),
+ serverExecutor
).thenApply(v -> {
final AppendEntriesReplyProto reply;
synchronized(this) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 96f7efbe1..ad4d988ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -115,7 +115,9 @@ class RaftServerProxy implements RaftServer {
return;
}
isClosed = true;
- map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(), entry.getValue()));
+ ConcurrentUtils.parallelForEachAsync(map.entrySet(),
+ entry -> close(entry.getKey(), entry.getValue()),
+ executor);
}
private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 4476f3ecf..215e8408f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -51,6 +51,8 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -85,6 +87,9 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
private static final AtomicInteger numNotifyInstallSnapshotFinished = new AtomicInteger();
private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing {
+
+ private final Executor stateMachineExecutor = Executors.newSingleThreadExecutor();
+
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto,
@@ -120,7 +125,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
return leaderSnapshotInfo.getTermIndex();
};
- return CompletableFuture.supplyAsync(supplier);
+ return CompletableFuture.supplyAsync(supplier, stateMachineExecutor);
}
@Override