You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/08/03 11:44:20 UTC
[ignite-3] branch main updated: IGNITE-14841 Use shared distruptor
pool for multiple raft group nodes. Fixes #207
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7a09d67 IGNITE-14841 Use shared distruptor pool for multiple raft group nodes. Fixes #207
7a09d67 is described below
commit 7a09d67231090d29533d27bda845b9b967d3db86
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Aug 3 14:43:36 2021 +0300
IGNITE-14841 Use shared distruptor pool for multiple raft group nodes. Fixes #207
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../apache/ignite/raft/jraft/core/ITNodeTest.java | 39 ++-
.../raft/server/ITJRaftCounterServerTest.java | 58 ++++-
.../internal/raft/server/impl/JRaftServerImpl.java | 49 ++++
.../org/apache/ignite/raft/jraft/JRaftUtils.java | 46 +++-
.../ignite/raft/jraft/core/FSMCallerImpl.java | 69 ++---
.../apache/ignite/raft/jraft/core/NodeImpl.java | 75 +++---
.../raft/jraft/core/ReadOnlyServiceImpl.java | 61 +++--
.../{util => disruptor}/DisruptorBuilder.java | 9 +-
.../ignite/raft/jraft/disruptor/GroupAware.java | 31 +++
.../raft/jraft/disruptor/StripedDisruptor.java | 282 +++++++++++++++++++++
.../ignite/raft/jraft/option/FSMCallerOptions.java | 26 +-
.../raft/jraft/option/LogManagerOptions.java | 31 ++-
.../ignite/raft/jraft/option/NodeOptions.java | 24 ++
.../raft/jraft/option/ReadOnlyServiceOptions.java | 21 ++
.../ignite/raft/jraft/option/RpcOptions.java | 51 +++-
.../raft/jraft/storage/impl/LogManagerImpl.java | 67 +++--
.../raft/jraft/util/LogExceptionHandler.java | 64 -----
.../ignite/raft/jraft/core/FSMCallerTest.java | 13 +-
.../raft/jraft/core/ReadOnlyServiceTest.java | 10 +
.../apache/ignite/raft/jraft/core/TestCluster.java | 52 ++++
.../raft/jraft/storage/impl/LogManagerTest.java | 13 +-
21 files changed, 858 insertions(+), 233 deletions(-)
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index f325312..2301939 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft.core;
+import com.codahale.metrics.ConsoleReporter;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.file.Path;
@@ -38,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
-import com.codahale.metrics.ConsoleReporter;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
@@ -60,6 +60,7 @@ import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
@@ -79,6 +80,7 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle;
import org.apache.ignite.raft.jraft.test.TestUtils;
@@ -3386,6 +3388,36 @@ public class ITNodeTest {
*/
private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) {
Configuration initialConf = nodeOptions.getInitialConf();
+ nodeOptions.setStripes(1);
+
+ StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
+ StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
+ StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+ StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+ nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
+ "JRaft-FSMCaller-Disruptor_ITNodeTest",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new FSMCallerImpl.ApplyTask(),
+ nodeOptions.getStripes()));
+
+ nodeOptions.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
+ "JRaft-NodeImpl-Disruptor_ITNodeTest",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new NodeImpl.LogEntryAndClosure(),
+ nodeOptions.getStripes()));
+
+ nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
+ "JRaft-ReadOnlyService-Disruptor_ITNodeTest",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+ nodeOptions.getStripes()));
+
+ nodeOptions.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
+ "JRaft-LogManager-Disruptor_ITNodeTest",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new LogManagerImpl.StableClosureEvent(),
+ nodeOptions.getStripes()));
Stream<PeerId> peers = initialConf == null ?
Stream.empty() :
@@ -3411,6 +3443,11 @@ public class ITNodeTest {
super.shutdown();
clusterService.stop();
+
+ fsmCallerDusruptor.shutdown();
+ nodeDisruptor.shutdown();
+ readOnlyServiceDisruptor.shutdown();
+ logManagerDisruptor.shutdown();
}
};
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index 7a4ac4c..b541005 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -23,6 +23,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -45,6 +46,8 @@ import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@@ -109,7 +112,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
/**
* Servers list.
*/
- protected final List<JRaftServerImpl> servers = new ArrayList<>();
+ private final List<JRaftServerImpl> servers = new ArrayList<>();
/**
* Clients list.
@@ -230,6 +233,53 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
}
/**
+ * Checks that the number of Disruptor threads does not depend on count started RAFT nodes.
+ */
+ @Test
+ public void testDisruptorThreadsCount() {
+ startServer(0, raftServer -> {
+ raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF);
+ });
+
+ Set<Thread> threads = getAllDisruptoCurrentThreads();
+
+ int threadsBefore = threads.size();
+
+ Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+
+ assertEquals(NodeOptions.DEFAULT_STRIPES * 4/*services*/, threadsBefore, "Started thread names: " + threadNamesBefore);
+
+ servers.forEach(srv -> {
+ for (int i = 0; i < 10; i++)
+ srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF);
+ });
+
+ threads = getAllDisruptoCurrentThreads();
+
+ int threadsAfter = threads.size();
+
+ Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+
+ threadNamesAfter.removeAll(threadNamesBefore);
+
+ assertEquals(threadsBefore, threadsAfter, "Difference: " + threadNamesAfter);
+ }
+
+ /**
+ * Get a set of Disruptor threads for the well known JRaft services.
+ *
+ * @return Set of Disruptor threads.
+ */
+ @NotNull private Set<Thread> getAllDisruptoCurrentThreads() {
+ return Thread.getAllStackTraces().keySet().stream().filter(t ->
+ t.getName().contains("JRaft-FSMCaller-Disruptor") ||
+ t.getName().contains("JRaft-NodeImpl-Disruptor") ||
+ t.getName().contains("JRaft-ReadOnlyService-Disruptor") ||
+ t.getName().contains("JRaft-LogManager-Disruptor"))
+ .collect(Collectors.toSet());
+ }
+
+ /**
*
*/
@Test
@@ -420,7 +470,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
client1.refreshLeader().get();
client2.refreshLeader().get();
- NodeImpl leader = servers.stream().map(s -> ((NodeImpl) s.raftGroupService(COUNTER_GROUP_0).getRaftNode())).
+ NodeImpl leader = servers.stream().map(s -> ((NodeImpl)s.raftGroupService(COUNTER_GROUP_0).getRaftNode())).
filter(n -> n.getState() == STATE_LEADER).findFirst().orElse(null);
assertNotNull(leader);
@@ -611,8 +661,8 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(groupId);
JRaftServerImpl.DelegatingStateMachine fsm0 =
- (JRaftServerImpl.DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
+ (JRaftServerImpl.DelegatingStateMachine)svc.getRaftNode().getOptions().getFsm();
- return expected == ((CounterListener) fsm0.getListener()).value();
+ return expected == ((CounterListener)fsm0.getListener()).value();
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index 840c6db..b383b26 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -43,12 +43,17 @@ import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
import org.apache.ignite.raft.jraft.util.JDKMarshaller;
@@ -127,6 +132,38 @@ public class JRaftServerImpl implements RaftServer {
JRaftUtils.createRequestExecutor(opts)
);
+ if (opts.getfSMCallerExecutorDisruptor() == null) {
+ opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(
+ "JRaft-FSMCaller-Disruptor",
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new FSMCallerImpl.ApplyTask(),
+ opts.getStripes()));
+ }
+
+ if (opts.getNodeApplyDisruptor() == null) {
+ opts.setNodeApplyDisruptor(new StripedDisruptor<NodeImpl.LogEntryAndClosure>(
+ "JRaft-NodeImpl-Disruptor",
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new NodeImpl.LogEntryAndClosure(),
+ opts.getStripes()));
+ }
+
+ if (opts.getReadOnlyServiceDisruptor() == null) {
+ opts.setReadOnlyServiceDisruptor(new StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>(
+ "JRaft-ReadOnlyService-Disruptor",
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+ opts.getStripes()));
+ }
+
+ if (opts.getLogManagerDisruptor() == null) {
+ opts.setLogManagerDisruptor(new StripedDisruptor<LogManagerImpl.StableClosureEvent>(
+ "JRaft-LogManager-Disruptor",
+ opts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new LogManagerImpl.StableClosureEvent(),
+ opts.getStripes()));
+ }
+
rpcServer.init(null);
}
@@ -136,6 +173,18 @@ public class JRaftServerImpl implements RaftServer {
groupService.shutdown();
rpcServer.shutdown();
+
+ if (opts.getfSMCallerExecutorDisruptor() != null)
+ opts.getfSMCallerExecutorDisruptor().shutdown();
+
+ if (opts.getNodeApplyDisruptor() != null)
+ opts.getNodeApplyDisruptor().shutdown();
+
+ if (opts.getReadOnlyServiceDisruptor() != null)
+ opts.getReadOnlyServiceDisruptor().shutdown();
+
+ if (opts.getLogManagerDisruptor() != null)
+ opts.getLogManagerDisruptor().shutdown();
}
/** {@inheritDoc} */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index 5492af6..36705b2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -23,13 +23,17 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
import org.apache.ignite.raft.jraft.core.Scheduler;
import org.apache.ignite.raft.jraft.core.TimerManager;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.BootstrapOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RpcOptions;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
import org.apache.ignite.raft.jraft.util.StringUtils;
@@ -49,10 +53,50 @@ public final class JRaftUtils {
* @return true if bootstrap success
*/
public static boolean bootstrap(final BootstrapOptions opts) throws InterruptedException {
- final NodeImpl node = new NodeImpl();
+ final NodeImpl node = new NodeImpl("bootstrap", new PeerId("127.0.0.1", 0));
+
+ NodeOptions nodeOpts = opts.getNodeOptions();
+
+ nodeOpts.setStripes(1);
+
+ StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
+ StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
+ StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+ StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+ nodeOpts.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
+ "JRaft-FSMCaller-Disruptor_bootstrap",
+ nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new FSMCallerImpl.ApplyTask(),
+ nodeOpts.getStripes()));
+
+ nodeOpts.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
+ "JRaft-NodeImpl-Disruptor_bootstrap",
+ nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new NodeImpl.LogEntryAndClosure(),
+ nodeOpts.getStripes()));
+
+ nodeOpts.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
+ "JRaft-ReadOnlyService-Disruptor_bootstrap",
+ nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+ nodeOpts.getStripes()));
+
+ nodeOpts.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
+ "JRaft-LogManager-Disruptor_bootstrap",
+ nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+ () -> new LogManagerImpl.StableClosureEvent(),
+ nodeOpts.getStripes()));
+
final boolean ret = node.bootstrap(opts);
node.shutdown();
node.join();
+
+ fsmCallerDusruptor.shutdown();
+ nodeDisruptor.shutdown();
+ readOnlyServiceDisruptor.shutdown();
+ logManagerDisruptor.shutdown();
+
return ret;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index f748aa1..4d93213 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -16,18 +16,14 @@
*/
package org.apache.ignite.raft.jraft.core;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.FSMCaller;
@@ -40,6 +36,8 @@ import org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
import org.apache.ignite.raft.jraft.closure.TaskClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
@@ -54,10 +52,7 @@ import org.apache.ignite.raft.jraft.option.FSMCallerOptions;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -100,7 +95,10 @@ public class FSMCallerImpl implements FSMCaller {
/**
* Apply task for disruptor.
*/
- private static class ApplyTask {
+ public static class ApplyTask implements GroupAware {
+ /** Raft group id. */
+ String groupId;
+
TaskType type;
// union fields
long committedIndex;
@@ -110,6 +108,11 @@ public class FSMCallerImpl implements FSMCaller {
Closure done;
CountDownLatch shutdownLatch;
+ /** {@inheritDoc} */
+ @Override public String groupId() {
+ return groupId;
+ }
+
public void reset() {
this.type = null;
this.committedIndex = 0;
@@ -118,14 +121,7 @@ public class FSMCallerImpl implements FSMCaller {
this.leaderChangeCtx = null;
this.done = null;
this.shutdownLatch = null;
- }
- }
-
- private static class ApplyTaskFactory implements EventFactory<ApplyTask> {
-
- @Override
- public ApplyTask newInstance() {
- return new ApplyTask();
+ this.groupId = null;
}
}
@@ -139,6 +135,9 @@ public class FSMCallerImpl implements FSMCaller {
}
}
+ /** Raft group id. */
+ String groupId;
+
private LogManager logManager;
private StateMachine fsm;
private ClosureQueue closureQueue;
@@ -149,7 +148,7 @@ public class FSMCallerImpl implements FSMCaller {
private volatile TaskType currTask;
private final AtomicLong applyingIndex;
private volatile RaftException error;
- private Disruptor<ApplyTask> disruptor;
+ private StripedDisruptor<ApplyTask> disruptor;
private RingBuffer<ApplyTask> taskQueue;
private volatile CountDownLatch shutdownLatch;
private NodeMetrics nodeMetrics;
@@ -165,6 +164,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean init(final FSMCallerOptions opts) {
+ this.groupId = opts.getGroupId();
this.logManager = opts.getLogManager();
this.fsm = opts.getFsm();
this.closureQueue = opts.getClosureQueue();
@@ -174,17 +174,11 @@ public class FSMCallerImpl implements FSMCaller {
this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
this.lastAppliedTerm = opts.getBootstrapId().getTerm();
- this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() //
- .setEventFactory(new ApplyTaskFactory()) //
- .setRingBufferSize(opts.getDisruptorBufferSize()) //
- .setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-" +
- node.getOptions().getServerName() + "-" + node.getNodeId().toString(), true)) //
- .setProducerType(ProducerType.MULTI) //
- .setWaitStrategy(new BlockingWaitStrategy()) //
- .build();
- this.disruptor.handleEventsWith(new ApplyTaskHandler());
- this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
- this.taskQueue = this.disruptor.start();
+
+ disruptor = opts.getfSMCallerExecutorDisruptor();
+
+ taskQueue = disruptor.subscribe(groupId, new ApplyTaskHandler());
+
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry().register("jraft-fsm-caller-disruptor",
new DisruptorMetricSet(this.taskQueue));
@@ -207,6 +201,7 @@ public class FSMCallerImpl implements FSMCaller {
this.shutdownLatch = latch;
Utils.runInThread(this.node.getOptions().getCommonExecutor(), () -> this.taskQueue.publishEvent((task, sequence) -> {
task.reset();
+ task.groupId = groupId;
task.type = TaskType.SHUTDOWN;
task.shutdownLatch = latch;
}));
@@ -225,6 +220,7 @@ public class FSMCallerImpl implements FSMCaller {
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
+
if (!this.taskQueue.tryPublishEvent(tpl)) {
setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
"FSMCaller is overload.")));
@@ -236,6 +232,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean onCommitted(final long committedIndex) {
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.COMMITTED;
task.committedIndex = committedIndex;
});
@@ -248,6 +245,7 @@ public class FSMCallerImpl implements FSMCaller {
void flush() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.FLUSH;
task.shutdownLatch = latch;
});
@@ -257,6 +255,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.SNAPSHOT_LOAD;
task.done = done;
});
@@ -265,6 +264,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean onSnapshotSave(final SaveSnapshotClosure done) {
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.SNAPSHOT_SAVE;
task.done = done;
});
@@ -273,6 +273,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean onLeaderStop(final Status status) {
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
@@ -281,6 +282,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean onLeaderStart(final long term) {
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.LEADER_START;
task.term = term;
});
@@ -289,6 +291,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean onStartFollowing(final LeaderChangeContext ctx) {
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.START_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
});
@@ -297,6 +300,7 @@ public class FSMCallerImpl implements FSMCaller {
@Override
public boolean onStopFollowing(final LeaderChangeContext ctx) {
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.STOP_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
});
@@ -334,6 +338,7 @@ public class FSMCallerImpl implements FSMCaller {
}
final OnErrorClosure c = new OnErrorClosure(error);
return enqueueTask((task, sequence) -> {
+ task.groupId = groupId;
task.type = TaskType.ERROR;
task.done = c;
});
@@ -348,7 +353,7 @@ public class FSMCallerImpl implements FSMCaller {
public synchronized void join() throws InterruptedException {
if (this.shutdownLatch != null) {
this.shutdownLatch.await();
- this.disruptor.shutdown();
+ this.disruptor.unsubscribe(groupId);
if (this.afterShutdown != null) {
this.afterShutdown.run(Status.OK());
this.afterShutdown = null;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 9d2df35..ed1d0b3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -16,6 +16,9 @@
*/
package org.apache.ignite.raft.jraft.core;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -30,13 +33,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.Closure;
@@ -54,6 +50,8 @@ import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.Ballot;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
@@ -106,10 +104,7 @@ import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Describer;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.RepeatedTimer;
import org.apache.ignite.raft.jraft.util.Requires;
@@ -192,7 +187,7 @@ public class NodeImpl implements Node, RaftServerService {
/**
* Disruptor to run node service
*/
- private Disruptor<LogEntryAndClosure> applyDisruptor;
+ private StripedDisruptor<LogEntryAndClosure> applyDisruptor;
private RingBuffer<LogEntryAndClosure> applyQueue;
/**
@@ -251,13 +246,22 @@ public class NodeImpl implements Node, RaftServerService {
/**
* Node service event.
*/
- private static class LogEntryAndClosure {
+ public static class LogEntryAndClosure implements GroupAware {
+ /** Raft group id. */
+ String groupId;
+
LogEntry entry;
Closure done;
long expectedTerm;
CountDownLatch shutdownLatch;
+ /** {@inheritDoc} */
+ @Override public String groupId() {
+ return groupId;
+ }
+
public void reset() {
+ this.groupId = null;
this.entry = null;
this.done = null;
this.expectedTerm = 0;
@@ -265,14 +269,6 @@ public class NodeImpl implements Node, RaftServerService {
}
}
- private static class LogEntryAndClosureFactory implements EventFactory<LogEntryAndClosure> {
-
- @Override
- public LogEntryAndClosure newInstance() {
- return new LogEntryAndClosure();
- }
- }
-
/**
* Event handler.
*/
@@ -281,8 +277,7 @@ public class NodeImpl implements Node, RaftServerService {
private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());
@Override
- public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
- throws Exception {
+ public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch) throws Exception {
if (event.shutdownLatch != null) {
if (!this.tasks.isEmpty()) {
executeApplyingTasks(this.tasks);
@@ -512,10 +507,6 @@ public class NodeImpl implements Node, RaftServerService {
}
}
- public NodeImpl() {
- this(null, null);
- }
-
public NodeImpl(final String groupId, final PeerId serverId) {
super();
if (groupId != null) {
@@ -554,14 +545,16 @@ public class NodeImpl implements Node, RaftServerService {
this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
this.logManager = new LogManagerImpl();
final LogManagerOptions opts = new LogManagerOptions();
+ opts.setGroupId(groupId);
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
opts.setLogStorage(this.logStorage);
opts.setConfigurationManager(this.configManager);
opts.setNode(this);
opts.setFsmCaller(this.fsmCaller);
opts.setNodeMetrics(this.metrics);
- opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
opts.setRaftOptions(this.raftOptions);
+ opts.setLogManagerDisruptor(options.getLogManagerDisruptor());
+
return this.logManager.init(opts);
}
@@ -741,8 +734,10 @@ public class NodeImpl implements Node, RaftServerService {
opts.setClosureQueue(this.closureQueue);
opts.setNode(this);
opts.setBootstrapId(bootstrapId);
- opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
opts.setRaftMessagesFactory(raftOptions.getRaftMessagesFactory());
+ opts.setfSMCallerExecutorDisruptor(options.getfSMCallerExecutorDisruptor());
+ opts.setGroupId(groupId);
+
return this.fsmCaller.init(opts);
}
@@ -954,16 +949,10 @@ public class NodeImpl implements Node, RaftServerService {
this.configManager = new ConfigurationManager();
- this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() //
- .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
- .setEventFactory(new LogEntryAndClosureFactory()) //
- .setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-" + suffix, true)) //
- .setProducerType(ProducerType.MULTI) //
- .setWaitStrategy(new BlockingWaitStrategy()) //
- .build();
- this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
- this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
- this.applyQueue = this.applyDisruptor.start();
+ applyDisruptor = opts.getNodeApplyDisruptor();
+
+ applyQueue = applyDisruptor.subscribe(groupId, new LogEntryAndClosureHandler());
+
if (this.metrics.getMetricRegistry() != null) {
this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",
new DisruptorMetricSet(this.applyQueue));
@@ -1044,9 +1033,11 @@ public class NodeImpl implements Node, RaftServerService {
this.readOnlyService = new ReadOnlyServiceImpl();
final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
+ rosOpts.setGroupId(groupId);
rosOpts.setFsmCaller(this.fsmCaller);
rosOpts.setNode(this);
rosOpts.setRaftOptions(this.raftOptions);
+ rosOpts.setReadOnlyServiceDisruptor(opts.getReadOnlyServiceDisruptor());
if (!this.readOnlyService.init(rosOpts)) {
LOG.error("Fail to init readOnlyService.");
@@ -1603,6 +1594,7 @@ public class NodeImpl implements Node, RaftServerService {
try {
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
+ event.groupId = groupId;
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
@@ -2800,7 +2792,10 @@ public class NodeImpl implements Node, RaftServerService {
this.shutdownLatch = latch;
Utils.runInThread(this.getOptions().getCommonExecutor(),
- () -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch));
+ () -> this.applyQueue.publishEvent((event, sequence) -> {
+ event.groupId = groupId;
+ event.shutdownLatch = latch;
+ }));
}
if (this.timerManager != null) {
this.timerManager.shutdown();
@@ -2875,7 +2870,7 @@ public class NodeImpl implements Node, RaftServerService {
Replicator.join(this.wakingCandidate);
}
this.shutdownLatch.await();
- this.applyDisruptor.shutdown();
+ this.applyDisruptor.unsubscribe(groupId);
this.shutdownLatch = null;
}
if (this.fsmCaller != null) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index 8da944a..31e6f56 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -16,6 +16,10 @@
*/
package org.apache.ignite.raft.jraft.core;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -25,19 +29,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.FSMCaller.LastAppliedLogIndexListener;
import org.apache.ignite.raft.jraft.ReadOnlyService;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.ReadIndexState;
import org.apache.ignite.raft.jraft.entity.ReadIndexStatus;
import org.apache.ignite.raft.jraft.error.RaftError;
@@ -50,10 +49,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
import org.apache.ignite.raft.jraft.util.ByteString;
import org.apache.ignite.raft.jraft.util.Bytes;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
import org.apache.ignite.raft.jraft.util.OnlyForTest;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
import org.apache.ignite.raft.jraft.util.Utils;
@@ -64,10 +60,13 @@ import org.apache.ignite.raft.jraft.util.Utils;
public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {
private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
+ /** RAFT group id. */
+ private String groupId;
+
/**
* Disruptor to run readonly service.
*/
- private Disruptor<ReadIndexEvent> readIndexDisruptor;
+ private StripedDisruptor<ReadIndexEvent> readIndexDisruptor;
private RingBuffer<ReadIndexEvent> readIndexQueue;
@@ -90,11 +89,19 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
private static final IgniteLogger LOG = IgniteLogger.forClass(ReadOnlyServiceImpl.class);
- private static class ReadIndexEvent {
+ public static class ReadIndexEvent implements GroupAware {
+ /** Raft group id. */
+ String groupId;
+
Bytes requestContext;
ReadIndexClosure done;
CountDownLatch shutdownLatch;
long startTime;
+
+ /** {@inheritDoc} */
+ @Override public String groupId() {
+ return groupId;
+ }
}
private static class ReadIndexEventFactory implements EventFactory<ReadIndexEvent> {
@@ -241,23 +248,16 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
@Override
public boolean init(final ReadOnlyServiceOptions opts) {
+ this.groupId = opts.getGroupId();
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.fsmCaller = opts.getFsmCaller();
this.raftOptions = opts.getRaftOptions();
- this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent>newInstance() //
- .setEventFactory(new ReadIndexEventFactory()) //
- .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
- .setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-" +
- node.getOptions().getServerName() + "-" + node.getNodeId().toString(), true)) //
- .setWaitStrategy(new BlockingWaitStrategy()) //
- .setProducerType(ProducerType.MULTI) //
- .build();
- this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
- this.readIndexDisruptor
- .setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
- this.readIndexQueue = this.readIndexDisruptor.start();
+ readIndexDisruptor = opts.getReadOnlyServiceDisruptor();
+
+ readIndexQueue = readIndexDisruptor.subscribe(groupId, new ReadIndexEventHandler());
+
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry() //
.register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
@@ -285,7 +285,10 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
}
this.shutdownLatch = new CountDownLatch(1);
Utils.runInThread(this.node.getOptions().getCommonExecutor(),
- () -> this.readIndexQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch));
+ () -> this.readIndexQueue.publishEvent((event, sequence) -> {
+ event.groupId = this.groupId;
+ event.shutdownLatch = this.shutdownLatch;
+ }));
}
@Override
@@ -293,7 +296,7 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
if (this.shutdownLatch != null) {
this.shutdownLatch.await();
}
- this.readIndexDisruptor.shutdown();
+ this.readIndexDisruptor.unsubscribe(groupId);
resetPendingStatusError(new Status(RaftError.ESTOP, "Node is quit."));
}
@@ -306,6 +309,7 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
try {
EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
+ event.groupId = this.groupId;
event.done = closure;
event.requestContext = new Bytes(reqCtx);
event.startTime = Utils.monotonicMs();
@@ -388,7 +392,10 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
@OnlyForTest
void flush() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
- this.readIndexQueue.publishEvent((task, sequence) -> task.shutdownLatch = latch);
+ this.readIndexQueue.publishEvent((task, sequence) -> {
+ task.groupId = this.groupId;
+ task.shutdownLatch = latch;
+ });
latch.await();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/DisruptorBuilder.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
similarity index 91%
rename from modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/DisruptorBuilder.java
rename to modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
index ddf8bb9..79aaa35 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/DisruptorBuilder.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ignite.raft.jraft.util;
+package org.apache.ignite.raft.jraft.disruptor;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
@@ -22,6 +22,8 @@ import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ThreadFactory;
+import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
+import org.apache.ignite.raft.jraft.util.Requires;
/**
* A builder to build a disruptor instance.
@@ -59,10 +61,10 @@ public class DisruptorBuilder<T> {
}
public ThreadFactory getThreadFactory() {
- return this.threadFactory;
+ return threadFactory;
}
- public DisruptorBuilder<T> setThreadFactory(final ThreadFactory threadFactory) {
+ public DisruptorBuilder<T> setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
@@ -88,6 +90,7 @@ public class DisruptorBuilder<T> {
public Disruptor<T> build() {
Requires.requireNonNull(this.ringBufferSize, " Ring buffer size not set");
Requires.requireNonNull(this.eventFactory, "Event factory not set");
+
return new Disruptor<>(this.eventFactory, this.ringBufferSize, this.threadFactory, this.producerType,
this.waitStrategy);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/GroupAware.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/GroupAware.java
new file mode 100644
index 0000000..542b3f7
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/GroupAware.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.disruptor;
+
+/**
+ * Interface provides group id.
+ * It allows to determine a stripe in Striped disruptor.
+ */
+public interface GroupAware {
+ /**
+ * Gets a group id.
+ *
+ * @return Group id.
+ */
+ String groupId();
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
new file mode 100644
index 0000000..40f99e8
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.disruptor;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ignite.lang.LoggerMessageHelper.format;
+
+/**
+ * Stripe Disruptor is a set of queues which process several independent groups in one queue (in the stripe).
+ * It makes fewer threads that the groups and gives the same sequential guaranties and a close performance.
+ *
+ * @param <T> Event type. This event should implement {@link GroupAware} interface.
+ */
+public class StripedDisruptor<T extends GroupAware> {
+ /** The logger. */
+ private static final Logger LOG = LoggerFactory.getLogger(StripedDisruptor.class);
+
+ /** Array of disruptors. Each Disruptor in the appropriate stripe. */
+ private final Disruptor<T>[] disruptors;
+
+ /** Array of Ring buffer. It placed according to disruptors in the array. */
+ private final RingBuffer<T>[] queues;
+
+ /** Disruptor event handler array. It placed according to disruptors in the array.*/
+ private final ArrayList<StripeEntryHandler> eventHandlers;
+
+ /** Disruptor error handler array. It placed according to disruptors in the array.*/
+ private final ArrayList<StripeExceptionHandler> exceptionHandlers;
+
+ /** Amount of stripes. */
+ private final int stripes;
+
+ /** The Striped disruptor name. */
+ private final String name;
+
+ /**
+ * @param name Name of the Striped disruptor.
+ * @param bufferSize Buffer size for each Disruptor.
+ * @param eventFactory Event factory for the Striped disruptor.
+ * @param stripes Amount of stripes.
+ */
+ public StripedDisruptor(String name, int bufferSize, EventFactory<T> eventFactory, int stripes) {
+ disruptors = new Disruptor[stripes];
+ queues = new RingBuffer[stripes];
+ eventHandlers = new ArrayList<>(stripes);
+ exceptionHandlers = new ArrayList<>(stripes);
+ this.stripes = stripes;
+ this.name = name;
+
+ for (int i = 0; i < stripes; i++) {
+ String stripeName = format("{}_stripe_{}-", name, i);
+
+ Disruptor<T> disruptor = DisruptorBuilder.<T>newInstance()
+ .setRingBufferSize(bufferSize)
+ .setEventFactory(eventFactory)
+ .setThreadFactory(new NamedThreadFactory(stripeName, true))
+ .setProducerType(ProducerType.MULTI)
+ .setWaitStrategy(new BlockingWaitStrategy())
+ .build();
+
+ eventHandlers.add(new StripeEntryHandler());
+ exceptionHandlers.add(new StripeExceptionHandler(name));
+
+ disruptor.handleEventsWith(eventHandlers.get(i));
+ disruptor.setDefaultExceptionHandler(exceptionHandlers.get(i));
+
+ queues[i] = disruptor.start();
+ disruptors[i] = disruptor;
+ }
+
+ LOG.info("Striped disruptor was started [name={}]", name);
+ }
+
+ /**
+ * Shutdowns all nested disruptors.
+ */
+ public void shutdown() {
+ for (int i = 0; i < stripes; i++)
+ disruptors[i].shutdown();
+
+ LOG.info("Striped disruptor stopped [name={}]", name);
+ }
+
+ /**
+ * Subscribes an event handler to one stripe of the Striped disruptor.
+ * The stripe is determined by a group id.
+ *
+ * @param group Group id.
+ * @param handler Event handler for the group specified.
+ * @return Disruptor queue appropriate to the group.
+ */
+ public RingBuffer<T> subscribe(String group, EventHandler<T> handler) {
+ return subscribe(group, handler, null);
+ }
+
+ /**
+ * Subscribes an event handler and a exception handler to one stripe of the Striped disruptor.
+ * The stripe is determined by a group id.
+ *
+ * @param group Group id.
+ * @param handler Event handler for the group specified.
+ * @param exceptionHandler Exception handler for the group specified.
+ * @return Disruptor queue appropriate to the group.
+ */
+ public RingBuffer<T> subscribe(String group, EventHandler<T> handler, BiConsumer<T, Throwable> exceptionHandler) {
+ eventHandlers.get(getStripe(group)).subscribe(group, handler);
+
+ if (exceptionHandler != null)
+ exceptionHandlers.get(getStripe(group)).subscribe(group, exceptionHandler);
+
+ LOG.info("Consumer subscribed [poolName={}, group={}]", name, group);
+
+ return queues[getStripe(group)];
+ }
+
+ /**
+ * Unsubscribes group for the Striped disruptor.
+ *
+ * @param group Group id.
+ */
+ public void unsubscribe(String group) {
+ eventHandlers.get(getStripe(group)).unsubscribe(group);
+ exceptionHandlers.get(getStripe(group)).unsubscribe(group);
+
+ LOG.info("Consumer unsubscribe [poolName={}, group={}]", name, group);
+ }
+
+ /**
+ * Determines a stripe by a group id and returns a stripe number.
+ *
+ * @param group Group id.
+ * @return Stripe of the Striped disruptor.
+ */
+ private int getStripe(String group) {
+ return Math.abs(group.hashCode() % stripes);
+ }
+
+ /**
+ * Determines a Disruptor queue by a group id.
+ *
+ * @param groupId Group id.
+ * @return Disruptor queue appropriate to the group.
+ */
+ public RingBuffer<T> queue(String groupId) {
+ return queues[getStripe(groupId)];
+ }
+
+ /**
+ * Event handler for stripe of the Striped disruptor.
+ * It routs an event to the event handler for a group.
+ */
+ private class StripeEntryHandler implements EventHandler<T> {
+ private final ConcurrentHashMap<String, EventHandler<T>> subscrivers;
+
+ /**
+ * The constructor.
+ */
+ StripeEntryHandler() {
+ subscrivers = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Subscribes a group to appropriate events for it.
+ *
+ * @param group Group id.
+ * @param handler Event handler for the group specified.
+ */
+ void subscribe(String group, EventHandler<T> handler) {
+ subscrivers.put(group, handler);
+ }
+
+ /**
+ * Unsubscribes a group for any event.
+ *
+ * @param group Group id.
+ */
+ void unsubscribe(String group) {
+ subscrivers.remove(group);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onEvent(T event, long sequence, boolean endOfBatch) throws Exception {
+ EventHandler<T> handler = subscrivers.get(event.groupId());
+
+ assert handler != null : format("Group of the event is unsupported [group={}, event={}]", event.groupId(), event);
+
+ handler.onEvent(event, sequence, endOfBatch);
+ }
+ }
+
+ /**
+ * Striped disruptor exxception handler.
+ * It prints into log when an exception has occurred and route it to the handler for group.
+ */
+ private class StripeExceptionHandler implements ExceptionHandler<T> {
+ /** Name of the Disruptor instance. */
+ private final String name;
+
+ /** There are exception handlers per group. */
+ private final ConcurrentHashMap<String, BiConsumer<T, Throwable>> subscrivers;
+
+ /**
+ * @param name Name of the Disruptor instance.
+ */
+ StripeExceptionHandler(String name) {
+ this.name = name;
+ this.subscrivers = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Subscribes a group to an exception, that might happen during handling an event for the group.
+ *
+ * @param group Group id.
+ * @param handler Exception handler.
+ */
+ void subscribe(String group, BiConsumer<T, Throwable> handler) {
+ subscrivers.put(group, handler);
+ }
+
+ /**
+ * Unsubscribes a group for any exception.
+ *
+ * @param group Group id.
+ */
+ void unsubscribe(String group) {
+ subscrivers.remove(group);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handleOnStartException(Throwable ex) {
+ LOG.error("Fail to start disruptor [name={}]", name, ex);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handleOnShutdownException(Throwable ex) {
+ LOG.error("Fail to shutdown disruptor [name={}]", name, ex);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void handleEventException(Throwable ex, long sequence, T event) {
+ BiConsumer<T, Throwable> handler = subscrivers.get(event.groupId());
+
+ LOG.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", name, event, handler != null, ex);
+
+ if (handler != null)
+ handler.accept(event, ex);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return format("{} [name={}]", StripedDisruptor.class.getSimpleName(), name);
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
index 913b5a2..dce9098 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
@@ -20,7 +20,9 @@ import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.closure.ClosureQueue;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.storage.LogManager;
@@ -28,6 +30,9 @@ import org.apache.ignite.raft.jraft.storage.LogManager;
* FSM caller options.
*/
public class FSMCallerOptions {
+ /** Raft group id. */
+ private String groupId;
+
private LogManager logManager;
private StateMachine fsm;
private Closure afterShutdown;
@@ -35,17 +40,22 @@ public class FSMCallerOptions {
private ClosureQueue closureQueue;
private NodeImpl node;
private RaftMessagesFactory raftMessagesFactory;
- /**
- * disruptor buffer size.
- */
- private int disruptorBufferSize = 1024;
+ private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
- public int getDisruptorBufferSize() {
- return this.disruptorBufferSize;
+ public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
+ return fSMCallerExecutorDisruptor;
}
- public void setDisruptorBufferSize(int disruptorBufferSize) {
- this.disruptorBufferSize = disruptorBufferSize;
+ public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
+ this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
}
public NodeImpl getNode() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
index d22d317..32e317e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
@@ -20,22 +20,43 @@ import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
/**
* Options for log manager.
*/
public class LogManagerOptions {
+ /** Raft group id. */
+ private String groupId;
+
private Node node;
private LogStorage logStorage;
private ConfigurationManager configurationManager;
private FSMCaller fsmCaller;
- private int disruptorBufferSize = 1024;
private RaftOptions raftOptions;
private NodeMetrics nodeMetrics;
private LogEntryCodecFactory logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
+ private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
+ return logManagerDisruptor;
+ }
+
+ public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
+ this.logManagerDisruptor = logManagerDisruptor;
+ }
public LogEntryCodecFactory getLogEntryCodecFactory() {
return this.logEntryCodecFactory;
@@ -61,14 +82,6 @@ public class LogManagerOptions {
this.raftOptions = raftOptions;
}
- public int getDisruptorBufferSize() {
- return this.disruptorBufferSize;
- }
-
- public void setDisruptorBufferSize(final int disruptorBufferSize) {
- this.disruptorBufferSize = disruptorBufferSize;
- }
-
public LogStorage getLogStorage() {
return this.logStorage;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 570a096..94c68e7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -35,6 +35,9 @@ import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
* Node options.
*/
public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
+ /** This value is used by default to determine the count of stripes in the striped queue. */
+ public static final int DEFAULT_STRIPES = Utils.cpus() * 2;
+
// A follower would become a candidate if it doesn't receive any message
// from the leader in |election_timeout_ms| milliseconds
// Default: 1000 (1s)
@@ -198,12 +201,29 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
/** Server name. */
private String serverName;
+ /** Amount of Disruptors that will handle the RAFT server. */
+ private int stripes = DEFAULT_STRIPES;
+
public NodeOptions() {
raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
raftOptions.setRaftClientMessagesFactory(getRaftClientMessagesFactory());
}
/**
+ * @return Stripe count.
+ */
+ public int getStripes() {
+ return stripes;
+ }
+
+ /**
+ * @param stripes Stripe count.
+ */
+ public void setStripes(int stripes) {
+ this.stripes = stripes;
+ }
+
+ /**
* The rpc client.
*/
public JRaftServiceFactory getServiceFactory() {
@@ -507,6 +527,10 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
nodeOptions.setServerName(this.getServerName());
nodeOptions.setScheduler(this.getScheduler());
nodeOptions.setClientExecutor(this.getClientExecutor());
+ nodeOptions.setNodeApplyDisruptor(this.getNodeApplyDisruptor());
+ nodeOptions.setfSMCallerExecutorDisruptor(this.getfSMCallerExecutorDisruptor());
+ nodeOptions.setReadOnlyServiceDisruptor(this.getReadOnlyServiceDisruptor());
+ nodeOptions.setLogManagerDisruptor(this.getLogManagerDisruptor());
return nodeOptions;
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java
index aa963c3..cb1ea95 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java
@@ -18,15 +18,36 @@ package org.apache.ignite.raft.jraft.option;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
/**
* Read-Only service options.
*/
public class ReadOnlyServiceOptions {
+ /** Raft group id. */
+ private String groupId;
private RaftOptions raftOptions;
private NodeImpl node;
private FSMCaller fsmCaller;
+ private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
+ return readOnlyServiceDisruptor;
+ }
+
+ public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
+ this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
+ }
public NodeImpl getNode() {
return node;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
index 5dc3fb4..b1af8a1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
@@ -16,11 +16,16 @@
*/
package org.apache.ignite.raft.jraft.option;
-import java.util.concurrent.ExecutorService;
import com.codahale.metrics.MetricRegistry;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.rpc.RpcClient;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
public class RpcOptions {
/** Raft message factory. */
@@ -64,6 +69,50 @@ public class RpcOptions {
*/
private ExecutorService clientExecutor;
+ /** Striped disruptor for FSMCaller service. The queue serves of an Append entry requests in the RAFT state machine. */
+ private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+
+ /** Striped disruptor for Node apply service. */
+ private StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor;
+
+ /** Striped disruptor for Read only service. */
+ private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+
+ /** Striped disruptor for Log manager service. */
+ private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+ public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
+ return fSMCallerExecutorDisruptor;
+ }
+
+ public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
+ this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
+ }
+
+ public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
+ return nodeApplyDisruptor;
+ }
+
+ public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
+ this.nodeApplyDisruptor = nodeApplyDisruptor;
+ }
+
+ public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
+ return readOnlyServiceDisruptor;
+ }
+
+ public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
+ this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
+ }
+
+ public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
+ return logManagerDisruptor;
+ }
+
+ public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
+ this.logManagerDisruptor = logManagerDisruptor;
+ }
+
/**
* Metric registry for RPC services, user should not use this field.
*/
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index c3beacf..16f2b3a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -16,23 +16,18 @@
*/
package org.apache.ignite.raft.jraft.storage.impl;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.FSMCaller;
import org.apache.ignite.raft.jraft.Status;
@@ -40,6 +35,8 @@ import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -56,10 +53,7 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogManager;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.raft.jraft.util.ArrayDeque;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
import org.apache.ignite.raft.jraft.util.Requires;
import org.apache.ignite.raft.jraft.util.SegmentList;
import org.apache.ignite.raft.jraft.util.ThreadHelper;
@@ -73,6 +67,9 @@ public class LogManagerImpl implements LogManager {
private static final IgniteLogger LOG = IgniteLogger.forClass(LogManagerImpl.class);
+ /** Raft group id. */
+ String groupId;
+
private LogStorage logStorage;
private ConfigurationManager configManager;
private FSMCaller fsmCaller;
@@ -89,7 +86,7 @@ public class LogManagerImpl implements LogManager {
private volatile long lastLogIndex;
private volatile LogId lastSnapshotId = new LogId(0, 0);
private final Map<Long, WaitMeta> waitMap = new HashMap<>();
- private Disruptor<StableClosureEvent> disruptor;
+ private StripedDisruptor<StableClosureEvent> disruptor;
private RingBuffer<StableClosureEvent> diskQueue;
private RaftOptions raftOptions;
private volatile CountDownLatch shutDownLatch;
@@ -106,23 +103,25 @@ public class LogManagerImpl implements LogManager {
LAST_LOG_ID // get last log id
}
- private static class StableClosureEvent {
+ public static class StableClosureEvent implements GroupAware {
+ /** Raft group id. */
+ String groupId;
+
StableClosure done;
EventType type;
+ /** {@inheritDoc} */
+ @Override public String groupId() {
+ return groupId;
+ }
+
void reset() {
+ this.groupId = null;
this.done = null;
this.type = null;
}
}
- private static class StableClosureEventFactory implements EventFactory<StableClosureEvent> {
- @Override
- public StableClosureEvent newInstance() {
- return new StableClosureEvent();
- }
- }
-
/**
* Waiter metadata
*/
@@ -175,6 +174,7 @@ public class LogManagerImpl implements LogManager {
this.logStorage = opts.getLogStorage();
this.configManager = opts.getConfigurationManager();
this.nodeOptions = opts.getNode().getOptions();
+ this.groupId = opts.getGroupId();
LogStorageOptions lsOpts = new LogStorageOptions();
lsOpts.setConfigurationManager(this.configManager);
@@ -188,22 +188,11 @@ public class LogManagerImpl implements LogManager {
this.lastLogIndex = this.logStorage.getLastLogIndex();
this.diskId = new LogId(this.lastLogIndex, getTermFromLogStorage(this.lastLogIndex));
this.fsmCaller = opts.getFsmCaller();
- this.disruptor = DisruptorBuilder.<StableClosureEvent>newInstance() //
- .setEventFactory(new StableClosureEventFactory()) //
- .setRingBufferSize(opts.getDisruptorBufferSize()) //
- .setThreadFactory(new NamedThreadFactory("JRaft-LogManager-Disruptor-" +
- opts.getNode().getOptions().getServerName() + "-" + opts.getNode().getNodeId(), true)) //
- .setProducerType(ProducerType.MULTI) //
- /*
- * Use timeout strategy in log manager. If timeout happens, it will called reportError to halt the node.
- */
- .setWaitStrategy(new TimeoutBlockingWaitStrategy(
- this.raftOptions.getDisruptorPublishEventWaitTimeoutSecs(), TimeUnit.SECONDS)) //
- .build();
- this.disruptor.handleEventsWith(new StableClosureEventHandler());
- this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName(),
- (event, ex) -> reportError(-1, "LogManager handle event error")));
- this.diskQueue = this.disruptor.start();
+ this.disruptor = opts.getLogManagerDisruptor();
+
+ this.diskQueue = disruptor.subscribe(groupId, new StableClosureEventHandler(),
+ (event, ex) -> reportError(-1, "LogManager handle event error"));
+
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry().register("jraft-log-manager-disruptor",
new DisruptorMetricSet(this.diskQueue));
@@ -219,6 +208,7 @@ public class LogManagerImpl implements LogManager {
this.shutDownLatch = new CountDownLatch(1);
Utils.runInThread(nodeOptions.getCommonExecutor(), () -> this.diskQueue.publishEvent((event, sequence) -> {
event.reset();
+ event.groupId = groupId;
event.type = EventType.SHUTDOWN;
}));
}
@@ -229,7 +219,7 @@ public class LogManagerImpl implements LogManager {
return;
}
this.shutDownLatch.await();
- this.disruptor.shutdown();
+ this.disruptor.unsubscribe(groupId);
}
@Override
@@ -328,6 +318,7 @@ public class LogManagerImpl implements LogManager {
int retryTimes = 0;
final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
event.reset();
+ event.groupId = groupId;
event.type = EventType.OTHER;
event.done = done;
};
@@ -363,6 +354,7 @@ public class LogManagerImpl implements LogManager {
}
if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
event.reset();
+ event.groupId = groupId;
event.type = type;
event.done = done;
})) {
@@ -376,6 +368,7 @@ public class LogManagerImpl implements LogManager {
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
return true;
}
+
return this.diskQueue.tryPublishEvent(translator);
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/LogExceptionHandler.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/LogExceptionHandler.java
deleted file mode 100644
index 0b9b4b6..0000000
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/LogExceptionHandler.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.jraft.util;
-
-import com.lmax.disruptor.ExceptionHandler;
-import org.apache.ignite.lang.IgniteLogger;
-
-/**
- * Disruptor exception handler.
- */
-public final class LogExceptionHandler<T> implements ExceptionHandler<T> {
-
- private static final IgniteLogger LOG = IgniteLogger.forClass(LogExceptionHandler.class);
-
- public interface OnEventException<T> {
-
- void onException(T event, Throwable ex);
- }
-
- private final String name;
- private final OnEventException<T> onEventException;
-
- public LogExceptionHandler(String name) {
- this(name, null);
- }
-
- public LogExceptionHandler(String name, OnEventException<T> onEventException) {
- this.name = name;
- this.onEventException = onEventException;
- }
-
- @Override
- public void handleOnStartException(Throwable ex) {
- LOG.error("Fail to start {} disruptor", ex, this.name);
- }
-
- @Override
- public void handleOnShutdownException(Throwable ex) {
- LOG.error("Fail to shutdown {}r disruptor", ex, this.name);
-
- }
-
- @Override
- public void handleEventException(Throwable ex, long sequence, T event) {
- LOG.error("Handle {} disruptor event error, event is {}", ex, this.name, event);
- if (this.onEventException != null) {
- this.onEventException.onException(event, ex);
- }
- }
-}
\ No newline at end of file
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index ae0bd6a..b0d56ad 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -25,13 +25,12 @@ import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.ClosureQueueImpl;
import org.apache.ignite.raft.jraft.closure.LoadSnapshotClosure;
import org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
-import org.apache.ignite.raft.jraft.entity.NodeId;
-import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.error.RaftException;
@@ -67,6 +66,9 @@ public class FSMCallerTest {
private LogManager logManager;
private ClosureQueueImpl closureQueue;
+ /** Disruptor for this service test. */
+ private StripedDisruptor disruptor;
+
@BeforeEach
public void setup() {
this.fsmCaller = new FSMCallerImpl();
@@ -75,7 +77,6 @@ public class FSMCallerTest {
this.closureQueue = new ClosureQueueImpl(options);
opts = new FSMCallerOptions();
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
- Mockito.when(this.node.getNodeId()).thenReturn(new NodeId("test", new PeerId("localhost", 8082)));
Mockito.when(this.node.getOptions()).thenReturn(options);
opts.setNode(this.node);
opts.setFsm(this.fsm);
@@ -83,6 +84,11 @@ public class FSMCallerTest {
opts.setBootstrapId(new LogId(10, 1));
opts.setClosureQueue(this.closureQueue);
opts.setRaftMessagesFactory(new RaftMessagesFactory());
+ opts.setGroupId("TestSrv");
+ opts.setfSMCallerExecutorDisruptor(disruptor = new StripedDisruptor<>("TestFSMDisruptor",
+ 1024,
+ () -> new FSMCallerImpl.ApplyTask(),
+ 1));
assertTrue(this.fsmCaller.init(opts));
}
@@ -91,6 +97,7 @@ public class FSMCallerTest {
if (this.fsmCaller != null) {
this.fsmCaller.shutdown();
this.fsmCaller.join();
+ disruptor.shutdown();
}
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index 2586420..53d9f98 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.ReadIndexState;
@@ -67,6 +68,9 @@ public class ReadOnlyServiceTest {
@Mock
private FSMCaller fsmCaller;
+ /** Disruptor for this service test. */
+ private StripedDisruptor disruptor;
+
@BeforeEach
public void setup() {
this.readOnlyServiceImpl = new ReadOnlyServiceImpl();
@@ -76,6 +80,11 @@ public class ReadOnlyServiceTest {
opts.setFsmCaller(this.fsmCaller);
opts.setNode(this.node);
opts.setRaftOptions(raftOptions);
+ opts.setGroupId("TestSrv");
+ opts.setReadOnlyServiceDisruptor(disruptor = new StripedDisruptor<>("TestReadOnlyServiceDisruptor",
+ 1024,
+ () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+ 1));
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, "unittest"));
@@ -93,6 +102,7 @@ public class ReadOnlyServiceTest {
public void teardown() throws Exception {
this.readOnlyServiceImpl.shutdown();
this.readOnlyServiceImpl.join();
+ disruptor.shutdown();
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 868d4f4..369b6b8 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -46,12 +47,14 @@ import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
import org.apache.ignite.raft.jraft.test.TestUtils;
import org.apache.ignite.raft.jraft.util.Endpoint;
import org.jetbrains.annotations.Nullable;
@@ -85,6 +88,26 @@ public class TestCluster {
private final Lock lock = new ReentrantLock();
private final Consumer<NodeOptions> optsClo;
+ /**
+ * These disruptors will be used for all RAFT servers in the cluster.
+ */
+ private final HashMap<Endpoint, StripedDisruptor<FSMCallerImpl.ApplyTask>> fsmCallerDusruptors = new HashMap<>();
+
+ /**
+ * These disruptors will be used for all RAFT servers in the cluster.
+ */
+ private final HashMap<Endpoint, StripedDisruptor<NodeImpl.LogEntryAndClosure>> nodeDisruptors = new HashMap<>();
+
+ /**
+ * These disruptors will be used for all RAFT servers in the cluster.
+ */
+ private final HashMap<Endpoint, StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>> readOnlyServiceDisruptors = new HashMap<>();
+
+ /**
+ * These disruptors will be used for all RAFT servers in the cluster.
+ */
+ private final HashMap<Endpoint, StripedDisruptor<LogManagerImpl.StableClosureEvent>> logManagerDisruptors = new HashMap<>();
+
private JRaftServiceFactory raftServiceFactory = new TestJRaftServiceFactory();
private LinkedHashSet<PeerId> learners;
@@ -206,6 +229,30 @@ public class TestCluster {
nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
nodeOptions.setElectionPriority(priority);
+ nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+ "JRaft-FSMCaller-Disruptor_TestCluster",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new FSMCallerImpl.ApplyTask(),
+ nodeOptions.getStripes())));
+
+ nodeOptions.setNodeApplyDisruptor(nodeDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+ "JRaft-NodeImpl-Disruptor_TestCluster",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new NodeImpl.LogEntryAndClosure(),
+ nodeOptions.getStripes())));
+
+ nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+ "JRaft-ReadOnlyService-Disruptor_TestCluster",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+ nodeOptions.getStripes())));
+
+ nodeOptions.setLogManagerDisruptor(logManagerDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+ "JRaft-LogManager-Disruptor_TestCluster",
+ nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+ () -> new LogManagerImpl.StableClosureEvent(),
+ nodeOptions.getStripes())));
+
final MockStateMachine fsm = new MockStateMachine(listenAddr);
nodeOptions.setFsm(fsm);
@@ -319,6 +366,11 @@ public class TestCluster {
final List<Endpoint> addrs = getAllNodes();
for (final Endpoint addr : addrs)
stop(addr);
+
+ fsmCallerDusruptors.values().forEach(StripedDisruptor::shutdown);
+ nodeDisruptors.values().forEach(StripedDisruptor::shutdown);
+ readOnlyServiceDisruptors.values().forEach(StripedDisruptor::shutdown);
+ logManagerDisruptors.values().forEach(StripedDisruptor::shutdown);
}
public void clean(final Endpoint listenAddr) {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index ab8211a..04c36df 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -26,11 +26,10 @@ import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
-import org.apache.ignite.raft.jraft.entity.NodeId;
-import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
import org.apache.ignite.raft.jraft.option.LogManagerOptions;
@@ -72,6 +71,9 @@ public class LogManagerTest extends BaseStorageTest {
private LogStorage logStorage;
+ /** Disruptor for this service test. */
+ private StripedDisruptor disruptor;
+
@BeforeEach
public void setup() throws Exception {
this.confManager = new ConfigurationManager();
@@ -80,7 +82,6 @@ public class LogManagerTest extends BaseStorageTest {
this.logManager = new LogManagerImpl();
final LogManagerOptions opts = new LogManagerOptions();
- Mockito.when(node.getNodeId()).thenReturn(new NodeId("test", new PeerId("localhost", 8082)));
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
Mockito.when(node.getOptions()).thenReturn(nodeOptions);
@@ -92,6 +93,11 @@ public class LogManagerTest extends BaseStorageTest {
opts.setNodeMetrics(new NodeMetrics(false));
opts.setLogStorage(this.logStorage);
opts.setRaftOptions(raftOptions);
+ opts.setGroupId("TestSrv");
+ opts.setLogManagerDisruptor(disruptor = new StripedDisruptor<>("TestLogManagerDisruptor",
+ 1024,
+ () -> new LogManagerImpl.StableClosureEvent(),
+ 1));
assertTrue(this.logManager.init(opts));
}
@@ -102,6 +108,7 @@ public class LogManagerTest extends BaseStorageTest {
@AfterEach
public void teardown() throws Exception {
this.logStorage.shutdown();
+ disruptor.shutdown();
}
@Test