You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/11/30 13:21:55 UTC
[ignite-3] branch main updated: IGNITE-15528 Fixed a thread leak in jraft, introduced shared pools mode.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2c97228 IGNITE-15528 Fixed a thread leak in jraft, introduced shared pools mode.
2c97228 is described below
commit 2c9722861f990d8b77359257fc1b32862fd18236
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Tue Nov 30 16:21:36 2021 +0300
IGNITE-15528 Fixed a thread leak in jraft, introduced shared pools mode.
---
.../ignite/raft/jraft/core/ItCliServiceTest.java | 7 +-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 76 +---
.../raft/server/ItJraftCounterServerTest.java | 138 +++++++-
.../internal/raft/server/impl/JraftServerImpl.java | 40 +++
.../org/apache/ignite/raft/jraft/JRaftUtils.java | 52 +--
.../apache/ignite/raft/jraft/core/NodeImpl.java | 392 +++++++++++++--------
.../raft/jraft/core/ReadOnlyServiceImpl.java | 2 +-
.../apache/ignite/raft/jraft/core/Replicator.java | 2 +-
.../raft/jraft/core/StateMachineAdapter.java | 7 +
.../ignite/raft/jraft/option/NodeOptions.java | 212 +++++++----
.../ignite/raft/jraft/option/RpcOptions.java | 67 +---
.../ignite/raft/jraft/rpc/ClientService.java | 14 +-
.../raft/jraft/rpc/impl/AbstractClientService.java | 84 ++---
.../raft/jraft/rpc/impl/IgniteRpcClient.java | 26 +-
.../raft/jraft/rpc/impl/IgniteRpcServer.java | 11 +-
.../raft/jraft/rpc/impl/PingRequestProcessor.java | 5 +
.../raft/jraft/storage/impl/LogManagerImpl.java | 2 +-
.../storage/snapshot/SnapshotExecutorImpl.java | 2 +-
.../ignite/raft/jraft/util/timer/DefaultTimer.java | 2 +-
.../internal/raft/server/impl/RaftServerImpl.java | 4 +-
.../ignite/raft/jraft/core/MockStateMachine.java | 5 -
.../raft/jraft/core/ReadOnlyServiceTest.java | 1 -
.../apache/ignite/raft/jraft/core/TestCluster.java | 104 ++----
.../raft/jraft/storage/SnapshotExecutorTest.java | 2 +-
.../apache/ignite/raft/jraft/test/TestUtils.java | 13 +
25 files changed, 737 insertions(+), 533 deletions(-)
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
index eea4acc..c3993b3 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
@@ -154,6 +154,8 @@ public class ItCliServiceTest {
cluster.stopAll();
ExecutorServiceHelper.shutdownAndAwaitTermination(clientExecutor);
+ TestUtils.assertAllJraftThreadsStopped();
+
LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName());
}
@@ -314,7 +316,8 @@ public class ItCliServiceTest {
assertNotNull(oldLeaderNode);
PeerId oldLeader = oldLeaderNode.getNodeId().getPeerId();
assertNotNull(oldLeader);
- assertTrue(cliService.changePeers(groupId, conf, new Configuration(newPeers)).isOk());
+ Status status = cliService.changePeers(groupId, conf, new Configuration(newPeers));
+ assertTrue(status.isOk(), status.getErrorMsg());
cluster.waitLeader();
PeerId newLeader = cluster.getLeader().getNodeId().getPeerId();
assertNotEquals(oldLeader, newLeader);
@@ -479,7 +482,7 @@ public class ItCliServiceTest {
for (Map.Entry<String, PeerId> entry : rebalancedLeaderIds.entrySet())
ret.compute(entry.getValue(), (ignored, num) -> num == null ? 1 : num + 1);
for (Map.Entry<PeerId, Integer> entry : ret.entrySet()) {
- System.out.println(entry);
+ LOG.info(entry.toString());
assertEquals(new PeerId("host_1", 8080), entry.getKey());
}
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 8240aac..2e403ae 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -60,7 +60,6 @@ import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
@@ -81,7 +80,6 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle;
import org.apache.ignite.raft.jraft.test.TestUtils;
@@ -159,12 +157,7 @@ public class ItNodeTest {
private final List<FixedThreadsExecutorGroup> appendEntriesExecutors = new ArrayList<>();
/** Test info. */
- private final TestInfo testInfo;
-
- /** */
- public ItNodeTest(TestInfo testInfo) {
- this.testInfo = testInfo;
- }
+ private TestInfo testInfo;
@BeforeAll
public static void setupNodeTest() {
@@ -182,9 +175,10 @@ public class ItNodeTest {
}
@BeforeEach
- public void setup(@WorkDirectory Path workDir) throws Exception {
+ public void setup(TestInfo testInfo, @WorkDirectory Path workDir) throws Exception {
LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName());
+ this.testInfo = testInfo;
dataPath = workDir.toString();
testStartMs = Utils.monotonicMs();
@@ -211,6 +205,9 @@ public class ItNodeTest {
startedCounter.set(0);
stoppedCounter.set(0);
+
+ TestUtils.assertAllJraftThreadsStopped();
+
LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName() + ", cost:"
+ (Utils.monotonicMs() - testStartMs) + " ms.");
}
@@ -260,8 +257,9 @@ public class ItNodeTest {
AtomicInteger c = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
+ int finalI = i;
Task task = new Task(data, new JoinableClosure(status -> {
- System.out.println(status);
+ LOG.info("{} i={}", status, finalI);
if (!status.isOk()) {
assertTrue(
status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM);
@@ -1509,10 +1507,8 @@ public class ItNodeTest {
PeerId oldLeader = leader.getNodeId().getPeerId();
assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint()));
- // apply something when follower
- //final List<Node> followers = cluster.getFollowers();
assertFalse(followers.isEmpty());
- sendTestTaskAndWait("follower apply ", followers.get(0), -1);
+ sendTestTaskAndWait("follower apply ", followers.get(0), -1); // Should fail, because no leader.
for (Node follower : followers) {
NodeImpl follower0 = (NodeImpl) follower;
@@ -1551,7 +1547,7 @@ public class ItNodeTest {
cluster.clean(oldLeader.getEndpoint());
// restart old leader
- LOG.info("restart old leader {}", oldLeader);
+ LOG.info("Restart old leader with cleanup {}", oldLeader);
assertTrue(cluster.start(oldLeader.getEndpoint()));
cluster.ensureSame();
@@ -2976,7 +2972,8 @@ public class ItNodeTest {
done.reset();
// works
leader.changePeers(conf, done);
- assertTrue(done.await().isOk());
+ Status await = done.await();
+ assertTrue(await.isOk(), await.getErrorMsg());
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
@@ -3391,16 +3388,7 @@ public class ItNodeTest {
}
private NodeOptions createNodeOptions() {
- NodeOptions options = new NodeOptions();
-
- ExecutorService executor = JRaftUtils.createCommonExecutor(options);
- executors.add(executor);
- options.setCommonExecutor(executor);
- FixedThreadsExecutorGroup appendEntriesExecutor = JRaftUtils.createAppendEntriesExecutor(options);
- appendEntriesExecutors.add(appendEntriesExecutor);
- options.setStripedExecutor(appendEntriesExecutor);
-
- return options;
+ return new NodeOptions();
}
/**
@@ -3478,35 +3466,6 @@ public class ItNodeTest {
Configuration initialConf = nodeOptions.getInitialConf();
nodeOptions.setStripes(1);
- StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
- StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
- StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
- StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
-
- nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
- "JRaft-FSMCaller-Disruptor_ITNodeTest",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new FSMCallerImpl.ApplyTask(),
- nodeOptions.getStripes()));
-
- nodeOptions.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
- "JRaft-NodeImpl-Disruptor_ITNodeTest",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new NodeImpl.LogEntryAndClosure(),
- nodeOptions.getStripes()));
-
- nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
- "JRaft-ReadOnlyService-Disruptor_ITNodeTest",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
- nodeOptions.getStripes()));
-
- nodeOptions.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
- "JRaft-LogManager-Disruptor_ITNodeTest",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new LogManagerImpl.StableClosureEvent(),
- nodeOptions.getStripes()));
-
Stream<PeerId> peers = initialConf == null ?
Stream.empty() :
Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream());
@@ -3538,14 +3497,11 @@ public class ItNodeTest {
var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager) {
@Override public synchronized void shutdown() {
- super.shutdown();
+ rpcServer.shutdown();
+ super.shutdown();
+
clusterService.stop();
-
- fsmCallerDusruptor.shutdown();
- nodeDisruptor.shutdown();
- readOnlyServiceDisruptor.shutdown();
- logManagerDisruptor.shutdown();
}
};
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 4d9124d..7a4770d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -17,6 +17,9 @@
package org.apache.ignite.raft.server;
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
@@ -35,6 +38,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -61,9 +67,12 @@ import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -149,6 +158,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
@AfterEach
@Override
protected void after() throws Exception {
+ super.after();
+
LOG.info("Start client shutdown");
Iterator<RaftGroupService> iterClients = clients.iterator();
@@ -170,8 +181,11 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
iterSrv.remove();
- server.stopRaftGroup(COUNTER_GROUP_0);
- server.stopRaftGroup(COUNTER_GROUP_1);
+ Set<String> grps = server.startedGroups();
+
+ for (String grp : grps) {
+ server.stopRaftGroup(grp);
+ }
server.beforeNodeStop();
@@ -180,7 +194,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS);
- super.after();
+ TestUtils.assertAllJraftThreadsStopped();
LOG.info(">>>>>>>>>>>>>>> End test method: {}", testInfo.getTestMethod().orElseThrow().getName());
}
@@ -189,14 +203,21 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
* Starts server.
*
* @param idx The index.
+ * @param clo Init closure.
+ * @param cons Node options updater.
+ *
* @return Raft server instance.
*/
- private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo) {
+ private JraftServerImpl startServer(int idx, Consumer<RaftServer> clo, Consumer<NodeOptions> cons) {
var addr = new NetworkAddress(getLocalAddress(), PORT);
ClusterService service = clusterService(PORT + idx, List.of(addr), true);
- JraftServerImpl server = new JraftServerImpl(service, dataPath) {
+ NodeOptions opts = new NodeOptions();
+
+ cons.accept(opts);
+
+ JraftServerImpl server = new JraftServerImpl(service, dataPath, opts) {
@Override
public void stop() {
servers.remove(this);
@@ -248,7 +269,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
startServer(i, raftServer -> {
raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
- });
+ }, opts -> {});
}
startClient(COUNTER_GROUP_0);
@@ -262,13 +283,13 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
public void testDisruptorThreadsCount() {
startServer(0, raftServer -> {
raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF);
- });
+ }, opts -> {});
Set<Thread> threads = getAllDisruptorCurrentThreads();
int threadsBefore = threads.size();
- Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+ Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(toSet());
assertEquals(NodeOptions.DEFAULT_STRIPES * 4/*services*/, threadsBefore, "Started thread names: " + threadNamesBefore);
@@ -282,7 +303,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
int threadsAfter = threads.size();
- Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+ Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(toSet());
threadNamesAfter.removeAll(threadNamesBefore);
@@ -305,11 +326,11 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
@NotNull
private Set<Thread> getAllDisruptorCurrentThreads() {
return Thread.getAllStackTraces().keySet().stream().filter(t ->
- t.getName().contains("JRaft-FSMCaller-Disruptor")
- || t.getName().contains("JRaft-NodeImpl-Disruptor")
- || t.getName().contains("JRaft-ReadOnlyService-Disruptor")
- || t.getName().contains("JRaft-LogManager-Disruptor"))
- .collect(Collectors.toSet());
+ t.getName().contains("JRaft-FSMCaller-Disruptor")
+ || t.getName().contains("JRaft-NodeImpl-Disruptor")
+ || t.getName().contains("JRaft-ReadOnlyService-Disruptor")
+ || t.getName().contains("JRaft-LogManager-Disruptor"))
+ .collect(toSet());
}
@Test
@@ -652,6 +673,91 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
doTestFollowerCatchUp(true, false);
}
+ /** Tests if a starting a new group in shared pools mode doesn't increases timer threads count. */
+ @Test
+ public void testTimerThreadsCount() {
+ JraftServerImpl srv0 = startServer(0, x -> {}, opts -> opts.setTimerPoolSize(1));
+ JraftServerImpl srv1 = startServer(1, x -> {}, opts -> opts.setTimerPoolSize(1));
+ JraftServerImpl srv2 = startServer(2, x -> {}, opts -> opts.setTimerPoolSize(1));
+
+ waitForTopology(srv0.clusterService(), 3, 5_000);
+
+ ExecutorService svc = Executors.newFixedThreadPool(16);
+
+ final int groupsCnt = 10;
+
+ try {
+ List<Future<?>> futs = new ArrayList<>(groupsCnt);
+
+ for (int i = 0; i < groupsCnt; i++) {
+ int finalI = i;
+ futs.add(svc.submit(new Runnable() {
+ @Override public void run() {
+ String grp = "counter" + finalI;
+
+ srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+ srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+ srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+ }
+ }));
+ }
+
+ for (Future<?> fut : futs) {
+ try {
+ fut.get();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ } finally {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(svc);
+ }
+
+ for (int i = 0; i < groupsCnt; i++) {
+ String grp = "counter" + i;
+
+ assertTrue(waitForCondition(() -> hasLeader(grp), 30_000));
+ }
+
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+
+ LOG.info("RAFT threads count {}", threads.stream().filter(t -> t.getName().contains("JRaft")).count());
+
+ List<Thread> timerThreads = threads.stream().filter(this::isTimer).sorted(comparing(Thread::getName)).collect(toList());
+
+ assertTrue(timerThreads.size() <= 15, // This is a maximum possible number of a timer threads for 3 nodes in this test.
+ "All timer threads: " + timerThreads.toString());
+ }
+
+ /**
+ * Returns {@code true} if thread is related to timers.
+ *
+ * @param thread The thread.
+ * @return {@code True} if a timer thread.
+ */
+ private boolean isTimer(Thread thread) {
+ String name = thread.getName();
+
+ return name.contains("ElectionTimer") || name.contains("VoteTimer")
+ || name.contains("StepDownTimer") || name.contains("SnapshotTimer") || name.contains("Node-Scheduler");
+ }
+
+ /**
+ * Returns {@code true} if a raft group has elected a leader for a some term.
+ *
+ * @param grpId Group id.
+ * @return {@code True} if a leader is elected.
+ */
+ private boolean hasLeader(String grpId) {
+ return servers.stream().anyMatch(s -> {
+ NodeImpl node = (NodeImpl) s.raftGroupService(grpId).getRaftNode();
+
+ StateMachineAdapter fsm = (StateMachineAdapter) node.getOptions().getFsm();
+
+ return node.isLeader() && fsm.getLeaderTerm() == node.getCurrentTerm();
+ });
+ }
+
/**
* Do test follower catch up.
*
@@ -721,7 +827,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
var svc2 = startServer(stopIdx, r -> {
r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
- });
+ }, opts -> {});
waitForCondition(() -> validateStateMachine(sum(20), svc2, COUNTER_GROUP_0), 5_000);
waitForCondition(() -> validateStateMachine(sum(30), svc2, COUNTER_GROUP_1), 5_000);
@@ -736,7 +842,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
var svc3 = startServer(stopIdx, r -> {
r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
- });
+ }, opts -> {});
waitForCondition(() -> validateStateMachine(sum(20), svc3, COUNTER_GROUP_0), 5_000);
waitForCondition(() -> validateStateMachine(sum(30), svc3, COUNTER_GROUP_1), 5_000);
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index e33fd47..30f2c01 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -115,6 +115,11 @@ public class JraftServerImpl implements RaftServer {
this.nodeManager = new NodeManager();
this.opts = opts;
+ // Auto-adjust options.
+ this.opts.setRpcConnectTimeoutMs(this.opts.getElectionTimeoutMs() / 3);
+ this.opts.setRpcDefaultTimeout(this.opts.getElectionTimeoutMs() / 2);
+ this.opts.setSharedPools(true);
+
if (opts.getServerName() == null) {
opts.setServerName(service.localConfiguration().getName());
}
@@ -123,6 +128,9 @@ public class JraftServerImpl implements RaftServer {
/** {@inheritDoc} */
@Override
public void start() {
+ assert opts.isSharedPools() : "RAFT server is supposed to run in shared pools mode";
+
+ // Pre-create all pools in shared mode.
if (opts.getCommonExecutor() == null) {
opts.setCommonExecutor(JRaftUtils.createCommonExecutor(opts));
}
@@ -139,6 +147,22 @@ public class JraftServerImpl implements RaftServer {
opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
}
+ if (opts.getVoteTimer() == null) {
+ opts.setVoteTimer(JRaftUtils.createTimer(opts, "JRaft-VoteTimer"));
+ }
+
+ if (opts.getElectionTimer() == null) {
+ opts.setElectionTimer(JRaftUtils.createTimer(opts, "JRaft-ElectionTimer"));
+ }
+
+ if (opts.getStepDownTimer() == null) {
+ opts.setStepDownTimer(JRaftUtils.createTimer(opts, "JRaft-StepDownTimer"));
+ }
+
+ if (opts.getSnapshotTimer() == null) {
+ opts.setSnapshotTimer(JRaftUtils.createTimer(opts, "JRaft-SnapshotTimer"));
+ }
+
requestExecutor = JRaftUtils.createRequestExecutor(opts);
rpcServer = new IgniteRpcServer(
@@ -218,6 +242,22 @@ public class JraftServerImpl implements RaftServer {
opts.getScheduler().shutdown();
}
+ if (opts.getElectionTimer() != null) {
+ opts.getElectionTimer().stop();
+ }
+
+ if (opts.getVoteTimer() != null) {
+ opts.getVoteTimer().stop();
+ }
+
+ if (opts.getStepDownTimer() != null) {
+ opts.getStepDownTimer().stop();
+ }
+
+ if (opts.getSnapshotTimer() != null) {
+ opts.getSnapshotTimer().stop();
+ }
+
if (opts.getClientExecutor() != null) {
ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index 6ad6349..5f5c459 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -24,23 +24,21 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.conf.Configuration;
-import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
-import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
import org.apache.ignite.raft.jraft.core.Scheduler;
import org.apache.ignite.raft.jraft.core.TimerManager;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.BootstrapOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RpcOptions;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.ThreadPoolUtil;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.DefaultFixedThreadsExecutorGroupFactory;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
+import org.apache.ignite.raft.jraft.util.timer.DefaultTimer;
+import org.apache.ignite.raft.jraft.util.timer.Timer;
/**
* Some helper methods for jraft usage.
@@ -59,44 +57,10 @@ public final class JRaftUtils {
nodeOpts.setStripes(1);
- StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
- StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
- StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
- StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
-
- nodeOpts.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
- "JRaft-FSMCaller-Disruptor_bootstrap",
- nodeOpts.getRaftOptions().getDisruptorBufferSize(),
- () -> new FSMCallerImpl.ApplyTask(),
- nodeOpts.getStripes()));
-
- nodeOpts.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
- "JRaft-NodeImpl-Disruptor_bootstrap",
- nodeOpts.getRaftOptions().getDisruptorBufferSize(),
- () -> new NodeImpl.LogEntryAndClosure(),
- nodeOpts.getStripes()));
-
- nodeOpts.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
- "JRaft-ReadOnlyService-Disruptor_bootstrap",
- nodeOpts.getRaftOptions().getDisruptorBufferSize(),
- () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
- nodeOpts.getStripes()));
-
- nodeOpts.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
- "JRaft-LogManager-Disruptor_bootstrap",
- nodeOpts.getRaftOptions().getDisruptorBufferSize(),
- () -> new LogManagerImpl.StableClosureEvent(),
- nodeOpts.getStripes()));
-
final boolean ret = node.bootstrap(opts);
node.shutdown();
node.join();
- fsmCallerDusruptor.shutdown();
- nodeDisruptor.shutdown();
- readOnlyServiceDisruptor.shutdown();
- logManagerDisruptor.shutdown();
-
return ret;
}
@@ -187,6 +151,18 @@ public final class JRaftUtils {
}
/**
+ * @param opts Node options.
+ * @param name The name.
+ * @return The timer.
+ */
+ public static Timer createTimer(NodeOptions opts, String name) {
+ return new DefaultTimer(
+ opts.getTimerPoolSize(),
+ NamedThreadFactory.threadPrefix(opts.getServerName(), name)
+ );
+ }
+
+ /**
* Create a striped executor.
*
* @param prefix Thread name prefix.
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 4fce934..8957c46 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -33,11 +33,13 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.JRaftServiceFactory;
+import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.ReadOnlyService;
import org.apache.ignite.raft.jraft.ReplicatorGroup;
@@ -105,6 +107,7 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Describer;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.RepeatedTimer;
import org.apache.ignite.raft.jraft.util.Requires;
@@ -114,7 +117,6 @@ import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.ThreadId;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.LongHeldDetectingReadWriteLock;
-import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
/**
* The raft replica node implementation.
@@ -170,19 +172,13 @@ public class NodeImpl implements Node, RaftServerService {
private final List<Closure> shutdownContinuations = new ArrayList<>();
private RaftClientService rpcClientService;
private ReadOnlyService readOnlyService;
+
/**
* Timers
*/
- private Scheduler timerManager;
private RepeatedTimer electionTimer;
private RepeatedTimer voteTimer;
- private RepeatedTimer stepDownTimer;
-
- private boolean sharedTimerManager;
-
- /**
- * Triggered on a leader each electionTimeoutMs / 2 milliseconds to ensure the alive quorum.
- */
+ private RepeatedTimer stepDownTimer; // Triggered on a leader each electionTimeoutMs / 2 millis to ensure the quorum.
private RepeatedTimer snapshotTimer;
private ScheduledFuture<?> transferTimer;
private ThreadId wakingCandidate;
@@ -214,11 +210,6 @@ public class NodeImpl implements Node, RaftServerService {
*/
private volatile int electionTimeoutCounter;
- /**
- * Timer factory.
- */
- private RaftTimerFactory timerFactory;
-
private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock {
static final long MAX_BLOCKING_MS_TO_REPORT = SystemPropertyUtil.getLong(
"jraft.node.detecting.lock.max_blocking_ms_to_report", -1);
@@ -788,6 +779,8 @@ public class NodeImpl implements Node, RaftServerService {
// Create fsmCaller at first as logManager needs it to report error
this.fsmCaller = new FSMCallerImpl();
+ initPools(opts.getNodeOptions());
+
if (!initLogStorage()) {
LOG.error("Fail to init log storage.");
return false;
@@ -873,7 +866,6 @@ public class NodeImpl implements Node, RaftServerService {
this.metrics = new NodeMetrics(opts.isEnableMetrics());
this.serverId.setPriority(opts.getElectionPriority());
this.electionTimeoutCounter = 0;
- this.timerFactory = opts.getServiceFactory().createRaftTimerFactory();
if (opts.getReplicationStateListeners() != null)
this.replicatorStateListeners.addAll(opts.getReplicationStateListeners());
@@ -882,77 +874,11 @@ public class NodeImpl implements Node, RaftServerService {
return false;
}
- // Init timers
- final String suffix = getOptions().getServerName() + "-";
-
- if (getOptions().getScheduler() == null)
- timerManager = timerFactory.createScheduler(this.options.getTimerPoolSize(),
- "JRaft-Node-ScheduleThreadPool-" + suffix);
- else {
- sharedTimerManager = true;
- timerManager = getOptions().getScheduler();
- }
-
- String name = "JRaft-VoteTimer-" + suffix;
- this.voteTimer = new RepeatedTimer(name, options.getElectionTimeoutMs(), timerFactory.getVoteTimer(name)) {
- @Override
- protected void onTrigger() {
- handleVoteTimeout();
- }
-
- @Override
- protected int adjustTimeout(final int timeoutMs) {
- return randomTimeout(timeoutMs);
- }
- };
-
- name = "JRaft-ElectionTimer-" + suffix;
- electionTimer = new RepeatedTimer(name, options.getElectionTimeoutMs(), timerFactory.getElectionTimer(name)) {
- @Override
- protected void onTrigger() {
- handleElectionTimeout();
- }
-
- @Override
- protected int adjustTimeout(final int timeoutMs) {
- return randomTimeout(timeoutMs);
- }
- };
-
- name = "JRaft-StepDownTimer-" + suffix;
- stepDownTimer = new RepeatedTimer(name, options.getElectionTimeoutMs() >> 1, timerFactory.getStepDownTimer(name)) {
- @Override
- protected void onTrigger() {
- handleStepDownTimeout();
- }
- };
-
- name = "JRaft-SnapshotTimer-" + suffix;
- snapshotTimer = new RepeatedTimer(name, options.getSnapshotIntervalSecs() * 1000, timerFactory.getSnapshotTimer(name)) {
- private volatile boolean firstSchedule = true;
-
- @Override
- protected void onTrigger() {
- handleSnapshotTimeout();
- }
-
- @Override
- protected int adjustTimeout(final int timeoutMs) {
- if (!this.firstSchedule) {
- return timeoutMs;
- }
+ // Init timers.
+ initTimers(opts);
- // Randomize the first snapshot trigger timeout
- this.firstSchedule = false;
- if (timeoutMs > 0) {
- int half = timeoutMs / 2;
- return half + ThreadLocalRandom.current().nextInt(half);
- }
- else {
- return timeoutMs;
- }
- }
- };
+ // Init pools.
+ initPools(opts);
this.configManager = new ConfigurationManager();
@@ -1027,7 +953,7 @@ public class NodeImpl implements Node, RaftServerService {
rgOpts.setRaftRpcClientService(this.rpcClientService);
rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null);
rgOpts.setRaftOptions(this.raftOptions);
- rgOpts.setTimerManager(this.timerManager);
+ rgOpts.setTimerManager(this.options.getScheduler());
// Adds metric registry to RPC service.
this.options.setMetricRegistry(this.metrics.getMetricRegistry());
@@ -1083,6 +1009,146 @@ public class NodeImpl implements Node, RaftServerService {
return true;
}
+ /**
+ * Validates a required option if shared pools are enabled.
+ *
+ * @param opts Options.
+ * @param name Option name.
+ */
+ private boolean validateOption(NodeOptions opts, String name) {
+ if (opts.isSharedPools())
+ throw new IllegalArgumentException(name + " is required if shared pools are enabled");
+
+ return true;
+ }
+
+
+ /**
+ * Initialize timer pools.
+ * @param opts The options.
+ */
+ private void initTimers(final NodeOptions opts) {
+ if (opts.getScheduler() == null && validateOption(opts, "scheduler"))
+ opts.setScheduler(JRaftUtils.createScheduler(opts));
+
+ String name = "JRaft-VoteTimer";
+ if (opts.getVoteTimer() == null && validateOption(opts, "voteTimer")) {
+ opts.setVoteTimer(JRaftUtils.createTimer(opts, name));
+ }
+
+ this.voteTimer = new RepeatedTimer(name, NodeImpl.this.options.getElectionTimeoutMs(), opts.getVoteTimer()) {
+ @Override
+ protected void onTrigger() {
+ handleVoteTimeout();
+ }
+
+ @Override
+ protected int adjustTimeout(final int timeoutMs) {
+ return randomTimeout(timeoutMs);
+ }
+ };
+
+ name = "JRaft-ElectionTimer";
+ if (opts.getElectionTimer() == null && validateOption(opts, "electionTimer"))
+ opts.setElectionTimer(JRaftUtils.createTimer(opts, name));
+ electionTimer = new RepeatedTimer(name, NodeImpl.this.options.getElectionTimeoutMs(), opts.getElectionTimer()) {
+ @Override
+ protected void onTrigger() {
+ handleElectionTimeout();
+ }
+
+ @Override
+ protected int adjustTimeout(final int timeoutMs) {
+ return randomTimeout(timeoutMs);
+ }
+ };
+
+ name = "JRaft-StepDownTimer";
+ if (opts.getStepDownTimer() == null && validateOption(opts, "stepDownTimer"))
+ opts.setStepDownTimer(JRaftUtils.createTimer(opts, name));
+ stepDownTimer = new RepeatedTimer(name, NodeImpl.this.options.getElectionTimeoutMs() >> 1, opts.getStepDownTimer()) {
+ @Override
+ protected void onTrigger() {
+ handleStepDownTimeout();
+ }
+ };
+
+ name = "JRaft-SnapshotTimer";
+ if (opts.getSnapshotTimer() == null && validateOption(opts, "snapshotTimer"))
+ opts.setSnapshotTimer(JRaftUtils.createTimer(opts, name));
+ snapshotTimer = new RepeatedTimer(name, NodeImpl.this.options.getSnapshotIntervalSecs() * 1000, opts.getSnapshotTimer()) {
+ private volatile boolean firstSchedule = true;
+
+ @Override
+ protected void onTrigger() {
+ handleSnapshotTimeout();
+ }
+
+ @Override
+ protected int adjustTimeout(final int timeoutMs) {
+ if (!this.firstSchedule) {
+ return timeoutMs;
+ }
+
+ // Randomize the first snapshot trigger timeout
+ this.firstSchedule = false;
+ if (timeoutMs > 0) {
+ int half = timeoutMs / 2;
+ return half + ThreadLocalRandom.current().nextInt(half);
+ }
+ else {
+ return timeoutMs;
+ }
+ }
+ };
+ }
+
+ /**
+ * @param opts Options.
+ */
+ private void initPools(final NodeOptions opts) {
+ if (opts.getCommonExecutor() == null && validateOption(opts, "commonExecutor"))
+ opts.setCommonExecutor(JRaftUtils.createCommonExecutor(opts));
+
+ if (opts.getStripedExecutor() == null && validateOption(opts, "stripedExecutor"))
+ opts.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(opts));
+
+ if (opts.getClientExecutor() == null && validateOption(opts, "clientExecutor"))
+ opts.setClientExecutor(JRaftUtils.createClientExecutor(opts, opts.getServerName()));
+
+ if (opts.getfSMCallerExecutorDisruptor() == null) {
+ opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(
+ NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-FSMCaller-Disruptor"),
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new FSMCallerImpl.ApplyTask(),
+ opts.getStripes()));
+ }
+
+ if (opts.getNodeApplyDisruptor() == null) {
+ opts.setNodeApplyDisruptor(new StripedDisruptor<NodeImpl.LogEntryAndClosure>(
+ NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-NodeImpl-Disruptor"),
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new NodeImpl.LogEntryAndClosure(),
+ opts.getStripes()));
+ }
+
+ if (opts.getReadOnlyServiceDisruptor() == null) {
+ opts.setReadOnlyServiceDisruptor(new StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>(
+ NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-ReadOnlyService-Disruptor"),
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+ opts.getStripes()));
+ }
+
+ if (opts.getLogManagerDisruptor() == null) {
+ opts.setLogManagerDisruptor(new StripedDisruptor<LogManagerImpl.StableClosureEvent>(
+ NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-LogManager-Disruptor"),
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new LogManagerImpl.StableClosureEvent(),
+ opts.getStripes()));
+ }
+ }
+
@OnlyForTest
void tryElectSelf() {
this.writeLock.lock();
@@ -1130,22 +1196,25 @@ public class NodeImpl implements Node, RaftServerService {
if (peer.equals(this.serverId)) {
continue;
}
- if (!this.rpcClientService.connect(peer.getEndpoint())) {
- LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
- continue;
- }
- final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
- done.request = raftOptions.getRaftMessagesFactory()
- .requestVoteRequest()
- .preVote(false) // It's not a pre-vote request.
- .groupId(this.groupId)
- .serverId(this.serverId.toString())
- .peerId(peer.toString())
- .term(this.currTerm)
- .lastLogIndex(lastLogId.getIndex())
- .lastLogTerm(lastLogId.getTerm())
- .build();
- this.rpcClientService.requestVote(peer.getEndpoint(), done.request, done);
+
+ rpcClientService.connectAsync(peer.getEndpoint()).thenAccept(ok -> {
+ if (!ok) {
+ LOG.warn("Node {} failed to init channel, address={}.", getNodeId(), peer.getEndpoint());
+ return ;
+ }
+ final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
+ done.request = raftOptions.getRaftMessagesFactory()
+ .requestVoteRequest()
+ .preVote(false) // It's not a pre-vote request.
+ .groupId(this.groupId)
+ .serverId(this.serverId.toString())
+ .peerId(peer.toString())
+ .term(this.currTerm)
+ .lastLogIndex(lastLogId.getIndex())
+ .lastLogTerm(lastLogId.getTerm())
+ .build();
+ this.rpcClientService.requestVote(peer.getEndpoint(), done.request, done);
+ });
}
this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
@@ -2414,17 +2483,13 @@ public class NodeImpl implements Node, RaftServerService {
return this.options;
}
- public Scheduler getTimerManager() {
- return this.timerManager;
- }
-
@Override
public RaftOptions getRaftOptions() {
return this.raftOptions;
}
@OnlyForTest
- long getCurrentTerm() {
+ public long getCurrentTerm() {
this.readLock.lock();
try {
return this.currTerm;
@@ -2641,9 +2706,10 @@ public class NodeImpl implements Node, RaftServerService {
@Override
public void run(final Status status) {
- NodeImpl.this.metrics.recordLatency("pre-vote", Utils.monotonicMs() - this.startMs);
+ long latency = Utils.monotonicMs() - this.startMs;
+ NodeImpl.this.metrics.recordLatency("pre-vote", latency);
if (!status.isOk()) {
- LOG.warn("Node {} PreVote to {} error: {}.", getNodeId(), this.peer, status);
+ LOG.warn("Node {} PreVote to {} latency={} error: {}.", getNodeId(), this.peer, status, latency);
}
else {
handlePreVoteResponse(this.peer, this.term, getResponse());
@@ -2687,22 +2753,25 @@ public class NodeImpl implements Node, RaftServerService {
if (peer.equals(this.serverId)) {
continue;
}
- if (!this.rpcClientService.connect(peer.getEndpoint())) {
- LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
- continue;
- }
- final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
- done.request = raftOptions.getRaftMessagesFactory()
- .requestVoteRequest()
- .preVote(true) // it's a pre-vote request.
- .groupId(this.groupId)
- .serverId(this.serverId.toString())
- .peerId(peer.toString())
- .term(this.currTerm + 1) // next term
- .lastLogIndex(lastLogId.getIndex())
- .lastLogTerm(lastLogId.getTerm())
- .build();
- this.rpcClientService.preVote(peer.getEndpoint(), done.request, done);
+
+ rpcClientService.connectAsync(peer.getEndpoint()).thenAccept(ok -> {
+ if (!ok) {
+ LOG.warn("Node {} failed to init channel, address={}.", getNodeId(), peer.getEndpoint());
+ return;
+ }
+ final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
+ done.request = raftOptions.getRaftMessagesFactory()
+ .requestVoteRequest()
+ .preVote(true) // it's a pre-vote request.
+ .groupId(this.groupId)
+ .serverId(this.serverId.toString())
+ .peerId(peer.toString())
+ .term(this.currTerm + 1) // next term
+ .lastLogIndex(lastLogId.getIndex())
+ .lastLogTerm(lastLogId.getTerm())
+ .build();
+ this.rpcClientService.preVote(peer.getEndpoint(), done.request, done);
+ });
}
this.prevVoteCtx.grant(this.serverId);
if (this.prevVoteCtx.isGranted()) {
@@ -2761,7 +2830,6 @@ public class NodeImpl implements Node, RaftServerService {
@Override
public void shutdown(final Closure done) {
- List<RepeatedTimer> timers = null;
this.writeLock.lock();
try {
LOG.info("Node {} shutdown, currTerm={} state={}.", getNodeId(), this.currTerm, this.state);
@@ -2773,8 +2841,8 @@ public class NodeImpl implements Node, RaftServerService {
new Status(RaftError.ESHUTDOWN, "Raft node is going to quit."));
}
this.state = State.STATE_SHUTTING;
- // Stop all timers
- timers = stopAllTimers();
+ // Stop all pending timer callbacks.
+ stopAllTimers();
if (this.readOnlyService != null) {
this.readOnlyService.shutdown();
}
@@ -2806,9 +2874,6 @@ public class NodeImpl implements Node, RaftServerService {
event.shutdownLatch = latch;
}));
}
- if (!sharedTimerManager && this.timerManager != null) {
- this.timerManager.shutdown();
- }
}
if (this.state != State.STATE_SHUTDOWN) {
@@ -2827,11 +2892,6 @@ public class NodeImpl implements Node, RaftServerService {
}
finally {
this.writeLock.unlock();
-
- // Destroy all timers out of lock
- if (timers != null) {
- destroyAllTimers(timers);
- }
}
}
@@ -2885,6 +2945,58 @@ public class NodeImpl implements Node, RaftServerService {
if (this.fsmCaller != null) {
this.fsmCaller.join();
}
+
+ // Stop and reset non shared pools.
+ NodeOptions opts = getOptions();
+
+ if (opts.getScheduler() != null && !opts.isSharedPools()) {
+ opts.getScheduler().shutdown();
+ opts.setScheduler(null);
+ }
+ if (opts.getElectionTimer() != null && !opts.isSharedPools()) {
+ opts.getElectionTimer().stop();
+ opts.setElectionTimer(null);
+ }
+ if (opts.getVoteTimer() != null && !opts.isSharedPools()) {
+ opts.getVoteTimer().stop();
+ opts.setVoteTimer(null);
+ }
+ if (opts.getStepDownTimer() != null && !opts.isSharedPools()) {
+ opts.getStepDownTimer().stop();
+ opts.setStepDownTimer(null);
+ }
+ if (opts.getSnapshotTimer() != null && !opts.isSharedPools()) {
+ opts.getSnapshotTimer().stop();
+ opts.setSnapshotTimer(null);
+ }
+ if (opts.getCommonExecutor() != null && !opts.isSharedPools()) {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getCommonExecutor());
+ opts.setCommonExecutor(null);
+ }
+ if (opts.getStripedExecutor() != null && !opts.isSharedPools()) {
+ opts.getStripedExecutor().shutdownGracefully();
+ opts.setStripedExecutor(null);
+ }
+ if (opts.getClientExecutor() != null && !opts.isSharedPools()) {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(opts.getClientExecutor());
+ opts.setClientExecutor(null);
+ }
+ if (opts.getfSMCallerExecutorDisruptor() != null && !opts.isSharedPools()) {
+ opts.getfSMCallerExecutorDisruptor().shutdown();
+ opts.setfSMCallerExecutorDisruptor(null);
+ }
+ if (opts.getNodeApplyDisruptor() != null && !opts.isSharedPools()) {
+ opts.getNodeApplyDisruptor().shutdown();
+ opts.setNodeApplyDisruptor(null);
+ }
+ if (opts.getReadOnlyServiceDisruptor() != null && !opts.isSharedPools()) {
+ opts.getReadOnlyServiceDisruptor().shutdown();
+ opts.setReadOnlyServiceDisruptor(null);
+ }
+ if (opts.getLogManagerDisruptor() != null && !opts.isSharedPools()) {
+ opts.getLogManagerDisruptor().shutdown();
+ opts.setLogManagerDisruptor(null);
+ }
}
private static class StopTransferArg {
@@ -3235,7 +3347,7 @@ public class NodeImpl implements Node, RaftServerService {
LOG.info("Node {} starts to transfer leadership to peer {}.", getNodeId(), peer);
final StopTransferArg stopArg = new StopTransferArg(this, this.currTerm, peerId);
this.stopTransferArg = stopArg;
- this.transferTimer = this.timerManager.schedule(() -> onTransferTimeout(stopArg),
+ this.transferTimer = this.getOptions().getScheduler().schedule(() -> onTransferTimeout(stopArg),
this.options.getElectionTimeoutMs(), TimeUnit.MILLISECONDS);
}
@@ -3288,9 +3400,11 @@ public class NodeImpl implements Node, RaftServerService {
// Parallelize response and election
done.sendResponse(resp);
doUnlock = false;
+
+ LOG.info("Node {} received TimeoutNowRequest from {}, term={} and starts voting.", getNodeId(), request.serverId(),
+ savedTerm);
+
electSelf();
- LOG.info("Node {} received TimeoutNowRequest from {}, term={}.", getNodeId(), request.serverId(),
- savedTerm);
}
finally {
if (doUnlock) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index 31e6f56..ae309ba 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -266,7 +266,7 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
this.fsmCaller.addLastAppliedLogIndexListener(this);
// start scanner.
- this.node.getTimerManager().scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
+ this.node.getOptions().getScheduler().scheduleAtFixedRate(() -> onApplied(this.fsmCaller.getLastAppliedIndex()),
this.raftOptions.getMaxElectionDelayMs(), this.raftOptions.getMaxElectionDelayMs(), TimeUnit.MILLISECONDS);
return true;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 14ef620..3d0c045 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -1026,7 +1026,7 @@ public class Replicator implements ThreadId.OnError {
this.catchUpClosure.getStatus().setError(code, RaftError.describeCode(code));
}
if (this.catchUpClosure.hasTimer()) {
- if (!beforeDestroy && !this.catchUpClosure.getTimer().cancel(true)) {
+ if (!beforeDestroy && !this.catchUpClosure.getTimer().cancel(false)) { // Avoid interrupting a thread in the pool.
// There's running timer task, let timer task trigger
// on_caught_up to void ABA problem
return;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
index 8a21fde..1fc3055 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/StateMachineAdapter.java
@@ -31,6 +31,8 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
* State machine adapter that implements all methods with default behavior except {@link #onApply(Iterator)}.
*/
public abstract class StateMachineAdapter implements StateMachine {
+ protected volatile long leaderTerm = -1;
+
/** The logger */
private static final IgniteLogger LOG = IgniteLogger.forClass(StateMachineAdapter.class);
@@ -51,8 +53,13 @@ public abstract class StateMachineAdapter implements StateMachine {
return false;
}
+ public long getLeaderTerm() {
+ return this.leaderTerm;
+ }
+
@Override
public void onLeaderStart(final long term) {
+ this.leaderTerm = term;
LOG.info("onLeaderStart: term={}.", term);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 632446e..5e61cc9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -23,13 +23,19 @@ import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.core.DefaultJRaftServiceFactory;
import org.apache.ignite.raft.jraft.core.ElectionPriority;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.core.Scheduler;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.util.Copiable;
import org.apache.ignite.raft.jraft.util.StringUtils;
import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
+import org.apache.ignite.raft.jraft.util.timer.Timer;
/**
* Node options.
@@ -40,8 +46,8 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
// A follower would become a candidate if it doesn't receive any message
// from the leader in |election_timeout_ms| milliseconds
- // Default: 1000 (1s)
- private int electionTimeoutMs = 1000; // follower to candidate timeout
+ // Default: 1200 (1.2s)
+ private int electionTimeoutMs = 1200; // follower to candidate timeout
// One node's local priority value would be set to | electionPriority |
// value when it starts up.If this value is set to 0,the node will never be a leader.
@@ -119,11 +125,6 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
private boolean disableCli = false;
/**
- * Whether use global timer pool, if true, the {@code timerPoolSize} will be invalid.
- */
- private boolean sharedTimerPool = false;
-
- /**
* Timer manager thread pool size
*/
private int timerPoolSize = Math.min(Utils.cpus() * 3, 20);
@@ -154,56 +155,83 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
private SnapshotThrottle snapshotThrottle;
/**
- * Whether use global election timer TODO asch remove this https://issues.apache.org/jira/browse/IGNITE-14832
+ * Custom service factory.
*/
- private boolean sharedElectionTimer = false;
+ private JRaftServiceFactory serviceFactory = new DefaultJRaftServiceFactory();
/**
- * Whether use global vote timer TODO asch remove this https://issues.apache.org/jira/browse/IGNITE-14832
+ * Callbacks for replicator events.
*/
- private boolean sharedVoteTimer = false;
+ private List<Replicator.ReplicatorStateListener> replicationStateListeners;
/**
- * Whether use global step down timer
+ * The common executor for short running tasks.
*/
- private boolean sharedStepDownTimer = false;
+ private ExecutorService commonExecutor;
/**
- * Whether use global snapshot timer
+ * Striped executor for processing AppendEntries request/reponse.
*/
- private boolean sharedSnapshotTimer = false;
+ private FixedThreadsExecutorGroup stripedExecutor;
/**
- * Custom service factory.
+ * The scheduler to execute delayed jobs.
*/
- private JRaftServiceFactory serviceFactory = new DefaultJRaftServiceFactory();
+ private Scheduler scheduler;
/**
- *
+ * The election timer.
*/
- private List<Replicator.ReplicatorStateListener> replicationStateListeners;
+ private Timer electionTimer;
/**
- * The common executor for short running tasks.
+ * The election timer.
*/
- private ExecutorService commonExecutor;
+ private Timer voteTimer;
/**
- * Striped executor for processing AppendEntries request/reponse.
+ * The election timer.
*/
- private FixedThreadsExecutorGroup stripedExecutor;
+ private Timer snapshotTimer;
/**
- * The scheduler to execute delayed jobs.
+ * The election timer.
*/
- private Scheduler scheduler;
+ private Timer stepDownTimer;
- /** Server name. */
+ /**
+ * Server name.
+ */
private String serverName;
- /** Amount of Disruptors that will handle the RAFT server. */
+ /**
+ * Striped disruptor for FSMCaller service. The queue serves of an Append entry requests in the RAFT state machine.
+ */
+ private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+
+ /**
+ * Striped disruptor for Node apply service.
+ */
+ private StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor;
+
+ /**
+ * Striped disruptor for Read only service.
+ */
+ private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+
+ /**
+ * Striped disruptor for Log manager service.
+ */
+ private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+ /**
+ * Amount of Disruptors that will handle the RAFT server.
+ */
private int stripes = DEFAULT_STRIPES;
+ /** */
+ private boolean sharedPools = false;
+
public NodeOptions() {
raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
}
@@ -223,6 +251,24 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
}
/**
+ * Returns {@code true} if shared pools mode is in use.
+ *
+ * <p>In this mode thread pools are passed in the node options and node doesn't attempt to create/destroy any.
+ *
+ * @return {@code true} if shared pools mode is in use.
+ */
+ public boolean isSharedPools() {
+ return sharedPools;
+ }
+
+ /**
+ * @param sharedPools {code true} to enable shared pools mode.
+ */
+ public void setSharedPools(boolean sharedPools) {
+ this.sharedPools = sharedPools;
+ }
+
+ /**
* The rpc client.
*/
public JRaftServiceFactory getServiceFactory() {
@@ -278,14 +324,6 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
this.commonThreadPollSize = commonThreadPollSize;
}
- public boolean isSharedTimerPool() {
- return sharedTimerPool;
- }
-
- public void setSharedTimerPool(boolean sharedTimerPool) {
- this.sharedTimerPool = sharedTimerPool;
- }
-
public int getTimerPoolSize() {
return this.timerPoolSize;
}
@@ -435,60 +473,60 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
this.disableCli = disableCli;
}
- public boolean isSharedElectionTimer() {
- return sharedElectionTimer;
+ public void setCommonExecutor(ExecutorService commonExecutor) {
+ this.commonExecutor = commonExecutor;
}
- public void setSharedElectionTimer(boolean sharedElectionTimer) {
- this.sharedElectionTimer = sharedElectionTimer;
+ public ExecutorService getCommonExecutor() {
+ return commonExecutor;
}
- public boolean isSharedVoteTimer() {
- return sharedVoteTimer;
+ public FixedThreadsExecutorGroup getStripedExecutor() {
+ return stripedExecutor;
}
- public void setSharedVoteTimer(boolean sharedVoteTimer) {
- this.sharedVoteTimer = sharedVoteTimer;
+ public void setStripedExecutor(FixedThreadsExecutorGroup stripedExecutor) {
+ this.stripedExecutor = stripedExecutor;
}
- public boolean isSharedStepDownTimer() {
- return sharedStepDownTimer;
+ public Scheduler getScheduler() {
+ return scheduler;
}
- public void setSharedStepDownTimer(boolean sharedStepDownTimer) {
- this.sharedStepDownTimer = sharedStepDownTimer;
+ public void setScheduler(Scheduler scheduler) {
+ this.scheduler = scheduler;
}
- public boolean isSharedSnapshotTimer() {
- return sharedSnapshotTimer;
+ public Timer getElectionTimer() {
+ return electionTimer;
}
- public void setSharedSnapshotTimer(boolean sharedSnapshotTimer) {
- this.sharedSnapshotTimer = sharedSnapshotTimer;
+ public void setElectionTimer(Timer electionTimer) {
+ this.electionTimer = electionTimer;
}
- public void setCommonExecutor(ExecutorService commonExecutor) {
- this.commonExecutor = commonExecutor;
+ public Timer getVoteTimer() {
+ return voteTimer;
}
- public ExecutorService getCommonExecutor() {
- return commonExecutor;
+ public void setVoteTimer(Timer voteTimer) {
+ this.voteTimer = voteTimer;
}
- public FixedThreadsExecutorGroup getStripedExecutor() {
- return stripedExecutor;
+ public Timer getSnapshotTimer() {
+ return snapshotTimer;
}
- public void setStripedExecutor(FixedThreadsExecutorGroup stripedExecutor) {
- this.stripedExecutor = stripedExecutor;
+ public void setSnapshotTimer(Timer snapshotTimer) {
+ this.snapshotTimer = snapshotTimer;
}
- public Scheduler getScheduler() {
- return scheduler;
+ public Timer getStepDownTimer() {
+ return stepDownTimer;
}
- public void setScheduler(Scheduler scheduler) {
- this.scheduler = scheduler;
+ public void setStepDownTimer(Timer stepDownTimer) {
+ this.stepDownTimer = stepDownTimer;
}
public String getServerName() {
@@ -499,6 +537,38 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
this.serverName = serverName;
}
+ public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
+ return fSMCallerExecutorDisruptor;
+ }
+
+ public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
+ this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
+ }
+
+ public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
+ return nodeApplyDisruptor;
+ }
+
+ public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
+ this.nodeApplyDisruptor = nodeApplyDisruptor;
+ }
+
+ public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
+ return readOnlyServiceDisruptor;
+ }
+
+ public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
+ this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
+ }
+
+ public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
+ return logManagerDisruptor;
+ }
+
+ public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
+ this.logManagerDisruptor = logManagerDisruptor;
+ }
+
@Override
public NodeOptions copy() {
final NodeOptions nodeOptions = new NodeOptions();
@@ -510,17 +580,12 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
nodeOptions.setCatchupMargin(this.catchupMargin);
nodeOptions.setFilterBeforeCopyRemote(this.filterBeforeCopyRemote);
nodeOptions.setDisableCli(this.disableCli);
- nodeOptions.setSharedTimerPool(this.sharedTimerPool);
nodeOptions.setTimerPoolSize(this.timerPoolSize);
nodeOptions.setCliRpcThreadPoolSize(this.cliRpcThreadPoolSize);
nodeOptions.setRaftRpcThreadPoolSize(this.raftRpcThreadPoolSize);
nodeOptions.setCommonThreadPollSize(this.commonThreadPollSize);
nodeOptions.setEnableMetrics(this.enableMetrics);
nodeOptions.setRaftOptions(this.raftOptions.copy());
- nodeOptions.setSharedElectionTimer(this.sharedElectionTimer);
- nodeOptions.setSharedVoteTimer(this.sharedVoteTimer);
- nodeOptions.setSharedStepDownTimer(this.sharedStepDownTimer);
- nodeOptions.setSharedSnapshotTimer(this.sharedSnapshotTimer);
nodeOptions.setReplicationStateListeners(this.replicationStateListeners);
nodeOptions.setCommonExecutor(this.getCommonExecutor());
nodeOptions.setStripedExecutor(this.getStripedExecutor());
@@ -531,6 +596,13 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
nodeOptions.setfSMCallerExecutorDisruptor(this.getfSMCallerExecutorDisruptor());
nodeOptions.setReadOnlyServiceDisruptor(this.getReadOnlyServiceDisruptor());
nodeOptions.setLogManagerDisruptor(this.getLogManagerDisruptor());
+ nodeOptions.setElectionTimer(this.getElectionTimer());
+ nodeOptions.setVoteTimer(this.getVoteTimer());
+ nodeOptions.setSnapshotTimer(this.getSnapshotTimer());
+ nodeOptions.setStepDownTimer(this.getStepDownTimer());
+ nodeOptions.setSharedPools(this.isSharedPools());
+ nodeOptions.setRpcDefaultTimeout(this.getRpcDefaultTimeout());
+ nodeOptions.setRpcConnectTimeoutMs(this.getRpcConnectTimeoutMs());
return nodeOptions;
}
@@ -543,11 +615,9 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
+ snapshotLogIndexMargin + ", catchupMargin=" + catchupMargin + ", initialConf=" + initialConf
+ ", fsm=" + fsm + ", logUri='" + logUri + '\'' + ", raftMetaUri='" + raftMetaUri + '\''
+ ", snapshotUri='" + snapshotUri + '\'' + ", filterBeforeCopyRemote=" + filterBeforeCopyRemote
- + ", disableCli=" + disableCli + ", sharedTimerPool=" + sharedTimerPool + ", timerPoolSize="
+ + ", disableCli=" + disableCli + ", timerPoolSize="
+ timerPoolSize + ", cliRpcThreadPoolSize=" + cliRpcThreadPoolSize + ", raftRpcThreadPoolSize="
+ raftRpcThreadPoolSize + ", enableMetrics=" + enableMetrics + ", snapshotThrottle=" + snapshotThrottle
- + ", sharedElectionTimer=" + sharedElectionTimer + ", sharedVoteTimer=" + sharedVoteTimer
- + ", sharedStepDownTimer=" + sharedStepDownTimer + ", sharedSnapshotTimer=" + sharedSnapshotTimer
+ ", serviceFactory=" + serviceFactory + ", raftOptions=" + raftOptions + "} " + super.toString();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
index 7452233..9807120 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
@@ -16,42 +16,41 @@
*/
package org.apache.ignite.raft.jraft.option;
-import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.ExecutorService;
+import com.codahale.metrics.MetricRegistry;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
-import org.apache.ignite.raft.jraft.core.NodeImpl;
-import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.rpc.RpcClient;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
public class RpcOptions {
/** Raft message factory. */
private RaftMessagesFactory raftMessagesFactory = new RaftMessagesFactory();
/**
- * Rpc handshake timeout in milliseconds Default: 2000(1s)
+ * Rpc connect timeout in milliseconds.
+ * Default: 1200 (1.2s)
*/
- private int rpcConnectTimeoutMs = 2000; // TODO asch rename to handshake timeout IGNITE-14832.
+ private int rpcConnectTimeoutMs = 1200;
/**
- * RPC request default timeout in milliseconds Default: 5000(5s)
+ * RPC request default timeout in milliseconds.
+ * Default: 5000 (5s)
*/
private int rpcDefaultTimeout = 5000;
/**
- * Install snapshot RPC request default timeout in milliseconds Default: 5 * 60 * 1000(5min)
+ * Install snapshot RPC request default timeout in milliseconds.
+ * Default: 5m
*/
private int rpcInstallSnapshotTimeout = 5 * 60 * 1000;
/**
- * RPC process thread pool size Default: 80
+ * RPC process thread pool size.
+ * Default: 80
*/
private int rpcProcessorThreadPoolSize = 80;
/**
- * Whether to enable checksum for RPC. Default: false
+ * Whether to enable checksum for RPC.
*/
private boolean enableRpcChecksum = false;
@@ -65,50 +64,6 @@ public class RpcOptions {
*/
private ExecutorService clientExecutor;
- /** Striped disruptor for FSMCaller service. The queue serves of an Append entry requests in the RAFT state machine. */
- private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
-
- /** Striped disruptor for Node apply service. */
- private StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor;
-
- /** Striped disruptor for Read only service. */
- private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
-
- /** Striped disruptor for Log manager service. */
- private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
-
- public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
- return fSMCallerExecutorDisruptor;
- }
-
- public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
- this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
- }
-
- public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
- return nodeApplyDisruptor;
- }
-
- public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
- this.nodeApplyDisruptor = nodeApplyDisruptor;
- }
-
- public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
- return readOnlyServiceDisruptor;
- }
-
- public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
- this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
- }
-
- public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
- return logManagerDisruptor;
- }
-
- public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
- this.logManagerDisruptor = logManagerDisruptor;
- }
-
/**
* Metric registry for RPC services, user should not use this field.
*/
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java
index 113de32..fe28df4 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ClientService.java
@@ -16,7 +16,7 @@
*/
package org.apache.ignite.raft.jraft.rpc;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.raft.jraft.Lifecycle;
import org.apache.ignite.raft.jraft.option.RpcOptions;
import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -26,12 +26,20 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
*/
public interface ClientService extends Lifecycle<RpcOptions> {
/**
- * Connect to endpoint, returns true when success. TODO asch rename to isAlive IGNITE-14832.
+ * Connect to endpoint, returns true when success. TODO asch it seems we don't need it IGNITE-14832.
*
* @param endpoint server address
* @return true on connect success
*/
boolean connect(final Endpoint endpoint);
+
+ /**
+ * Connect to endpoint asynchronously, returns true when success.
+ *
+ * @param endpoint server address
+ * @return The future with the result.
+ */
+ CompletableFuture<Boolean> connectAsync(final Endpoint endpoint);
/**
* Send a requests and waits for response with callback, returns the request future.
@@ -42,6 +50,6 @@ public interface ClientService extends Lifecycle<RpcOptions> {
* @param timeoutMs timeout millis
* @return a future with operation result
*/
- <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index b0c0240..792f621 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -18,10 +18,10 @@ package org.apache.ignite.raft.jraft.rpc.impl;
import java.net.ConnectException;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyEventHandler;
@@ -106,69 +106,71 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
@Override
public boolean connect(final Endpoint endpoint) {
+ try {
+ return connectAsync(endpoint).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ LOG.error("Interrupted while connecting to {}, exception: {}.", endpoint, e.getMessage());
+ } catch (ExecutionException e) {
+ LOG.error("Fail to connect {}, exception: {}.", endpoint, e.getMessage());
+ }
+
+ return false;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> connectAsync(Endpoint endpoint) {
final RpcClient rc = this.rpcClient;
if (rc == null) {
throw new IllegalStateException("Client service is uninitialized.");
}
-
+
// Remote node is alive and pinged, safe to continue.
- if (readyAddresses.contains(endpoint.toString()))
- return true;
-
- // Remote node must be pinged to make sure listeners are set.
- synchronized (this) {
- if (readyAddresses.contains(endpoint.toString()))
+ if (readyAddresses.contains(endpoint.toString())) {
+ return CompletableFuture.completedFuture(true);
+ }
+
+ final RpcRequests.PingRequest req = rpcOptions.getRaftMessagesFactory()
+ .pingRequest()
+ .sendTimestamp(System.currentTimeMillis())
+ .build();
+
+ CompletableFuture<Message> fut =
+ invokeWithDone(endpoint, req, null, null, rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
+
+ return fut.thenApply(msg -> {
+ ErrorResponse resp = (ErrorResponse) msg;
+
+ if (resp != null && resp.errorCode() == 0) {
+ readyAddresses.add(endpoint.toString());
+
return true;
-
- try {
- final RpcRequests.PingRequest req = rpcOptions.getRaftMessagesFactory()
- .pingRequest()
- .sendTimestamp(System.currentTimeMillis())
- .build();
-
- Future<Message> fut =
- invokeWithDone(endpoint, req, null, null, rpcOptions.getRpcConnectTimeoutMs(), rpcExecutor);
-
- final ErrorResponse resp = (ErrorResponse) fut.get(); // Future will be certainly terminated by timeout.
-
- if (resp != null && resp.errorCode() == 0) {
- readyAddresses.add(endpoint.toString());
-
- return true;
- }
- else
- return false;
- }
- catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
+ } else {
+ return false;
}
- catch (final ExecutionException e) {
- LOG.error("Fail to connect {}, exception: {}.", endpoint, e.getMessage());
- }
- }
-
- return false;
+ });
}
-
+
@Override
- public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs) {
return invokeWithDone(endpoint, request, done, timeoutMs, this.rpcExecutor);
}
- public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
return invokeWithDone(endpoint, request, null, done, timeoutMs, rpcExecutor);
}
- public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs) {
return invokeWithDone(endpoint, request, ctx, done, timeoutMs, this.rpcExecutor);
}
- public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoint, final Message request,
+ public <T extends Message> CompletableFuture<Message> invokeWithDone(final Endpoint endpoint, final Message request,
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
index dee12eb..1562188 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcClient.java
@@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiPredicate;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
@@ -41,6 +43,8 @@ import org.apache.ignite.raft.jraft.util.Utils;
import static org.apache.ignite.raft.jraft.JRaftUtils.addressFromEndpoint;
public class IgniteRpcClient implements RpcClientEx {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteRpcClient.class);
+
private volatile BiPredicate<Object, String> recordPred;
private BiPredicate<Object, String> blockPred;
@@ -108,13 +112,17 @@ public class IgniteRpcClient implements RpcClientEx {
synchronized (this) {
if (blockPred != null && blockPred.test(request, endpoint.toString())) {
- blockedMsgs.add(new Object[] {
- request,
- endpoint.toString(),
- fut.hashCode(),
- System.currentTimeMillis(),
- (Runnable)() -> send(endpoint, request, fut, timeoutMs)
- });
+ Object[] msgData = {
+ request,
+ endpoint.toString(),
+ fut.hashCode(),
+ System.currentTimeMillis(),
+ (Runnable) () -> send(endpoint, request, fut, timeoutMs)
+ };
+
+ blockedMsgs.add(msgData);
+
+ LOG.info("Blocked message to={} id={} msg={}", endpoint.toString(), msgData[2], S.toString(request));
return fut;
}
@@ -163,7 +171,9 @@ public class IgniteRpcClient implements RpcClientEx {
for (Object[] msg : msgs) {
Runnable r = (Runnable) msg[4];
-
+
+ LOG.info("Unblocked message to={} id={} msg={}", msg[1], msg[2], S.toString(msg[0]));
+
r.run();
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
index 2a39de3..ac2c6a7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/IgniteRpcServer.java
@@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
@@ -56,6 +58,8 @@ import org.apache.ignite.raft.jraft.rpc.impl.core.TimeoutNowRequestProcessor;
* TODO https://issues.apache.org/jira/browse/IGNITE-14519 Unsubscribe on shutdown
*/
public class IgniteRpcServer implements RpcServer<Void> {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteRpcServer.class);
+
private final ClusterService service;
private final NodeManager nodeManager;
@@ -91,7 +95,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
registerProcessor(new GetFileRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new InstallSnapshotRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new RequestVoteRequestProcessor(rpcExecutor, raftMessagesFactory));
- registerProcessor(new PingRequestProcessor(rpcExecutor, raftMessagesFactory)); // TODO asch this should go last.
+ registerProcessor(new PingRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new TimeoutNowRequestProcessor(rpcExecutor, raftMessagesFactory));
registerProcessor(new ReadIndexRequestProcessor(rpcExecutor, raftMessagesFactory));
// raft native cli service
@@ -111,6 +115,7 @@ public class IgniteRpcServer implements RpcServer<Void> {
var messageHandler = new RpcMessageHandler();
+ // Add the handler after all processors are set up.
service.messagingService().addMessageHandler(RaftMessageGroup.class, messageHandler);
service.topologyService().addEventHandler(new TopologyEventHandler() {
@@ -185,7 +190,8 @@ public class IgniteRpcServer implements RpcServer<Void> {
finalPrc.handleRequest(context, message);
});
} catch (RejectedExecutionException e) {
- // Node is stopping.
+ // The rejection is ok if an executor has been stopped, otherwise it shouldn't happen.
+ LOG.warn("A request execution was rejected [sender={} req={} reason={}]", senderAddr, S.toString(message), e.getMessage());
}
}
}
@@ -217,5 +223,6 @@ public class IgniteRpcServer implements RpcServer<Void> {
/** {@inheritDoc} */
@Override public void shutdown() {
+ // Should deregister listeners.
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java
index a653dc2..6bd0c6f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/PingRequestProcessor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.rpc.impl;
import java.util.concurrent.Executor;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
import org.apache.ignite.raft.jraft.rpc.RpcContext;
@@ -27,6 +28,8 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.PingRequest;
* Ping request processor.
*/
public class PingRequestProcessor implements RpcProcessor<PingRequest> {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(PingRequestProcessor.class);
+
/** The executor */
private final Executor executor;
@@ -44,6 +47,8 @@ public class PingRequestProcessor implements RpcProcessor<PingRequest> {
/** {@inheritDoc} */
@Override
public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
+ LOG.debug("Pinged from={}", rpcCtx.getRemoteAddress());
+
rpcCtx.sendResponse(RaftRpcFactory.DEFAULT.newResponse(msgFactory, 0, "OK"));
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 4593058..1feac60 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -912,7 +912,7 @@ public class LogManagerImpl implements LogManager {
this.readLock.unlock();
}
try {
- c.await();
+ c.await(); // TODO FIXME asch https://issues.apache.org/jira/browse/IGNITE-15974
}
catch (final InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index 280c890..4097417 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -651,7 +651,7 @@ public class SnapshotExecutorImpl implements SnapshotExecutor {
final SnapshotCopierOptions copierOpts = new SnapshotCopierOptions();
copierOpts.setNodeOptions(this.node.getOptions());
copierOpts.setRaftClientService(this.node.getRpcClientService());
- copierOpts.setTimerManager(this.node.getTimerManager());
+ copierOpts.setTimerManager(this.node.getOptions().getScheduler());
copierOpts.setRaftOptions(this.node.getRaftOptions());
return copierOpts;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java
index 07637fe..7b20039 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/timer/DefaultTimer.java
@@ -91,7 +91,7 @@ public class DefaultTimer implements Timer {
@Override
public boolean cancel() {
final ScheduledFuture<?> f = future;
- return f != null && f.cancel(true);
+ return f != null && f.cancel(false); // Avoid interrupting a thread in the pool.
}
};
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 9672e1b..f06911a 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -193,13 +193,13 @@ public class RaftServerImpl implements RaftServer {
public @Nullable Peer localPeer(String groupId) {
return new Peer(service.topologyService().localMember().address());
}
-
+
/** {@inheritDoc} */
@Override
public Set<String> startedGroups() {
return listeners.keySet();
}
-
+
/**
* Handle action request.
*
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
index 056e686..8033ffc 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
@@ -45,7 +45,6 @@ public class MockStateMachine extends StateMachineAdapter {
private final Lock lock = new ReentrantLock();
private volatile int onStartFollowingTimes = 0;
private volatile int onStopFollowingTimes = 0;
- private volatile long leaderTerm = -1;
private volatile long appliedIndex = -1;
private volatile long snapshotIndex = -1L;
private final List<ByteBuffer> logs = new ArrayList<>();
@@ -78,10 +77,6 @@ public class MockStateMachine extends StateMachineAdapter {
return this.onStopFollowingTimes;
}
- public long getLeaderTerm() {
- return this.leaderTerm;
- }
-
public long getAppliedIndex() {
return this.appliedIndex;
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index 05af451..f098a6c 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -104,7 +104,6 @@ public class ReadOnlyServiceTest {
nodeOptions.setScheduler(scheduler);
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
Mockito.when(this.node.getGroupId()).thenReturn("test");
- Mockito.when(this.node.getTimerManager()).thenReturn(nodeOptions.getScheduler());
Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
Mockito.when(this.node.getNodeId()).thenReturn(new NodeId("test", new PeerId("localhost:8081", 0)));
Mockito.when(this.node.getServerId()).thenReturn(new PeerId("localhost:8081", 0));
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index b24c08d..87b0425 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -22,13 +22,11 @@ import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -48,18 +46,16 @@ import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.conf.Configuration;
-import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
-import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
-import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
+import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
@@ -76,9 +72,9 @@ public class TestCluster {
/**
* Default election timeout.
* Important: due to sync disk ops (writing raft meta) during probe request processing this timeout should be high
- * enough to avoid test flakiness.
+ * enough to avoid test flakiness. Test environment might give another instability.
*/
- private static final int ELECTION_TIMEOUT_MILLIS = 600;
+ private static final int ELECTION_TIMEOUT_MILLIS = 1000;
private static final IgniteLogger LOG = IgniteLogger.forClass(TestCluster.class);
@@ -95,32 +91,6 @@ public class TestCluster {
/** Test info. */
private final TestInfo testInfo;
- /**
- * These disruptors will be used for all RAFT servers in the cluster.
- */
- private final HashMap<Endpoint, StripedDisruptor<FSMCallerImpl.ApplyTask>> fsmCallerDusruptors = new HashMap<>();
-
- /**
- * These disruptors will be used for all RAFT servers in the cluster.
- */
- private final HashMap<Endpoint, StripedDisruptor<NodeImpl.LogEntryAndClosure>> nodeDisruptors = new HashMap<>();
-
- /**
- * These disruptors will be used for all RAFT servers in the cluster.
- */
- private final HashMap<Endpoint, StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>> readOnlyServiceDisruptors = new HashMap<>();
-
- /**
- * These disruptors will be used for all RAFT servers in the cluster.
- */
- private final HashMap<Endpoint, StripedDisruptor<LogManagerImpl.StableClosureEvent>> logManagerDisruptors = new HashMap<>();
-
- private List<ExecutorService> executors = new CopyOnWriteArrayList<>();
-
- private List<FixedThreadsExecutorGroup> fixedThreadsExecutorGroups = new CopyOnWriteArrayList<>();
-
- private List<Scheduler> schedulers = new CopyOnWriteArrayList<>();
-
private JRaftServiceFactory raftServiceFactory = new TestJRaftServiceFactory();
private LinkedHashSet<PeerId> learners;
@@ -236,23 +206,11 @@ public class TestCluster {
return true;
}
+ // Start node in non shared pools mode. Pools will be managed by node itself.
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setServerName(listenAddr.toString());
- ExecutorService executor = JRaftUtils.createCommonExecutor(nodeOptions);
- executors.add(executor);
- nodeOptions.setCommonExecutor(executor);
- FixedThreadsExecutorGroup threadsExecutorGroup = JRaftUtils.createAppendEntriesExecutor(nodeOptions);
- fixedThreadsExecutorGroups.add(threadsExecutorGroup);
- nodeOptions.setStripedExecutor(threadsExecutorGroup);
- ExecutorService clientExecutor = JRaftUtils.createClientExecutor(nodeOptions, nodeOptions.getServerName());
- executors.add(clientExecutor);
- nodeOptions.setClientExecutor(clientExecutor);
- Scheduler scheduler = JRaftUtils.createScheduler(nodeOptions);
- schedulers.add(scheduler);
- nodeOptions.setScheduler(scheduler);
-
nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
nodeOptions.setEnableMetrics(enableMetrics);
nodeOptions.setSnapshotThrottle(snapshotThrottle);
@@ -267,31 +225,16 @@ public class TestCluster {
nodeOptions.setRaftMetaUri(serverDataPath + File.separator + "meta");
nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
nodeOptions.setElectionPriority(priority);
-
- nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
- "JRaft-FSMCaller-Disruptor_TestCluster",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new FSMCallerImpl.ApplyTask(),
- nodeOptions.getStripes())));
-
- nodeOptions.setNodeApplyDisruptor(nodeDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
- "JRaft-NodeImpl-Disruptor_TestCluster",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new NodeImpl.LogEntryAndClosure(),
- nodeOptions.getStripes())));
-
- nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
- "JRaft-ReadOnlyService-Disruptor_TestCluster",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
- nodeOptions.getStripes())));
-
- nodeOptions.setLogManagerDisruptor(logManagerDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
- "JRaft-LogManager-Disruptor_TestCluster",
- nodeOptions.getRaftOptions().getDisruptorBufferSize(),
- () -> new LogManagerImpl.StableClosureEvent(),
- nodeOptions.getStripes())));
-
+
+ // Align rpc options with election timeout.
+ nodeOptions.setRpcConnectTimeoutMs(this.electionTimeoutMs / 3);
+ nodeOptions.setRpcDefaultTimeout(this.electionTimeoutMs / 2);
+
+ // Reduce default threads count per test node.
+ nodeOptions.setRaftRpcThreadPoolSize(Utils.cpus());
+ nodeOptions.setTimerPoolSize(Utils.cpus() * 2);
+ nodeOptions.setRpcProcessorThreadPoolSize(Utils.cpus() * 3);
+
MockStateMachine fsm = new MockStateMachine(listenAddr);
nodeOptions.setFsm(fsm);
@@ -319,8 +262,6 @@ public class TestCluster {
ExecutorService requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
- executors.add(requestExecutor);
-
var rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions, requestExecutor);
clusterService.start();
@@ -331,10 +272,15 @@ public class TestCluster {
RaftGroupService server = new RaftGroupService(this.name, new PeerId(listenAddr, 0, priority),
nodeOptions, rpcServer, nodeManager) {
@Override public synchronized void shutdown() {
- super.shutdown();
-
+ // This stop order is consistent with JRaftServerImpl
rpcServer.shutdown();
+ ExecutorServiceHelper.shutdownAndAwaitTermination(requestExecutor);
+
+ super.shutdown();
+
+ // Network service must be stopped after a node because raft initiates timeoutnowrequest on stop for faster
+ // leader election.
clusterService.stop();
}
};
@@ -402,14 +348,6 @@ public class TestCluster {
List<Endpoint> addrs = getAllNodes();
for (Endpoint addr : addrs)
stop(addr);
-
- fsmCallerDusruptors.values().forEach(StripedDisruptor::shutdown);
- nodeDisruptors.values().forEach(StripedDisruptor::shutdown);
- readOnlyServiceDisruptors.values().forEach(StripedDisruptor::shutdown);
- logManagerDisruptors.values().forEach(StripedDisruptor::shutdown);
- executors.forEach(ExecutorServiceHelper::shutdownAndAwaitTermination);
- fixedThreadsExecutorGroups.forEach(FixedThreadsExecutorGroup::shutdownGracefully);
- schedulers.forEach(Scheduler::shutdown);
}
public void clean(Endpoint listenAddr) {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
index e650e03..60fbd2e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/SnapshotExecutorTest.java
@@ -121,9 +121,9 @@ public class SnapshotExecutorTest extends BaseStorageTest {
Mockito.when(node.getRaftOptions()).thenReturn(new RaftOptions());
options = new NodeOptions();
options.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
+ options.setScheduler(timerManager);
Mockito.when(node.getOptions()).thenReturn(options);
Mockito.when(node.getRpcClientService()).thenReturn(raftClientService);
- Mockito.when(node.getTimerManager()).thenReturn(timerManager);
Mockito.when(node.getServiceFactory()).thenReturn(new DefaultJRaftServiceFactory());
executor = new SnapshotExecutorImpl();
final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
index 0cc551e..c4dbcf2 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/test/TestUtils.java
@@ -40,6 +40,9 @@ import org.apache.ignite.raft.jraft.util.Endpoint;
import org.mockito.ArgumentCaptor;
import static java.lang.Thread.sleep;
+import static java.util.Comparator.comparing;
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test helper
@@ -183,4 +186,14 @@ public class TestUtils {
}
}, timeout);
}
+
+ /**
+ * Waits until all RAFT threads are stopped, throwing an AssertionException otherwise.
+ */
+ public static void assertAllJraftThreadsStopped() {
+ assertTrue(waitForCondition(() -> Thread.getAllStackTraces().keySet().stream().
+ noneMatch(t -> t.getName().contains("JRaft")), 5_000),
+ Thread.getAllStackTraces().keySet().stream().filter(t -> t.getName().contains("JRaft")).
+ sorted(comparing(Thread::getName)).collect(toList()).toString());
+ }
}