You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/08/03 11:44:20 UTC

[ignite-3] branch main updated: IGNITE-14841 Use shared distruptor pool for multiple raft group nodes. Fixes #207

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a09d67  IGNITE-14841 Use shared distruptor pool for multiple raft group nodes. Fixes #207
7a09d67 is described below

commit 7a09d67231090d29533d27bda845b9b967d3db86
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Aug 3 14:43:36 2021 +0300

    IGNITE-14841 Use shared distruptor pool for multiple raft group nodes. Fixes #207
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../apache/ignite/raft/jraft/core/ITNodeTest.java  |  39 ++-
 .../raft/server/ITJRaftCounterServerTest.java      |  58 ++++-
 .../internal/raft/server/impl/JRaftServerImpl.java |  49 ++++
 .../org/apache/ignite/raft/jraft/JRaftUtils.java   |  46 +++-
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |  69 ++---
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  75 +++---
 .../raft/jraft/core/ReadOnlyServiceImpl.java       |  61 +++--
 .../{util => disruptor}/DisruptorBuilder.java      |   9 +-
 .../ignite/raft/jraft/disruptor/GroupAware.java    |  31 +++
 .../raft/jraft/disruptor/StripedDisruptor.java     | 282 +++++++++++++++++++++
 .../ignite/raft/jraft/option/FSMCallerOptions.java |  26 +-
 .../raft/jraft/option/LogManagerOptions.java       |  31 ++-
 .../ignite/raft/jraft/option/NodeOptions.java      |  24 ++
 .../raft/jraft/option/ReadOnlyServiceOptions.java  |  21 ++
 .../ignite/raft/jraft/option/RpcOptions.java       |  51 +++-
 .../raft/jraft/storage/impl/LogManagerImpl.java    |  67 +++--
 .../raft/jraft/util/LogExceptionHandler.java       |  64 -----
 .../ignite/raft/jraft/core/FSMCallerTest.java      |  13 +-
 .../raft/jraft/core/ReadOnlyServiceTest.java       |  10 +
 .../apache/ignite/raft/jraft/core/TestCluster.java |  52 ++++
 .../raft/jraft/storage/impl/LogManagerTest.java    |  13 +-
 21 files changed, 858 insertions(+), 233 deletions(-)

diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
index f325312..2301939 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ITNodeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import com.codahale.metrics.ConsoleReporter;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
@@ -38,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Stream;
-import com.codahale.metrics.ConsoleReporter;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.lang.IgniteLogger;
@@ -60,6 +60,7 @@ import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
 import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
 import org.apache.ignite.raft.jraft.closure.TaskClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.Task;
@@ -79,6 +80,7 @@ import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
 import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle;
 import org.apache.ignite.raft.jraft.test.TestUtils;
@@ -3386,6 +3388,36 @@ public class ITNodeTest {
      */
     private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) {
         Configuration initialConf = nodeOptions.getInitialConf();
+        nodeOptions.setStripes(1);
+
+        StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
+        StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
+        StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+        StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+        nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
+            "JRaft-FSMCaller-Disruptor_ITNodeTest",
+            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+            () -> new FSMCallerImpl.ApplyTask(),
+            nodeOptions.getStripes()));
+
+        nodeOptions.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
+            "JRaft-NodeImpl-Disruptor_ITNodeTest",
+            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+            () -> new NodeImpl.LogEntryAndClosure(),
+            nodeOptions.getStripes()));
+
+        nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
+            "JRaft-ReadOnlyService-Disruptor_ITNodeTest",
+            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+            () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+            nodeOptions.getStripes()));
+
+        nodeOptions.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
+            "JRaft-LogManager-Disruptor_ITNodeTest",
+            nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+            () -> new LogManagerImpl.StableClosureEvent(),
+            nodeOptions.getStripes()));
 
         Stream<PeerId> peers = initialConf == null ?
             Stream.empty() :
@@ -3411,6 +3443,11 @@ public class ITNodeTest {
                 super.shutdown();
 
                 clusterService.stop();
+
+                fsmCallerDusruptor.shutdown();
+                nodeDisruptor.shutdown();
+                readOnlyServiceDisruptor.shutdown();
+                logManagerDisruptor.shutdown();
             }
         };
 
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index 7a4ac4c..b541005 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -23,6 +23,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -45,6 +46,8 @@ import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.option.NodeOptions;
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -109,7 +112,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
     /**
      * Servers list.
      */
-    protected final List<JRaftServerImpl> servers = new ArrayList<>();
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
 
     /**
      * Clients list.
@@ -230,6 +233,53 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
     }
 
     /**
+     * Checks that the number of Disruptor threads does not depend on  count started RAFT nodes.
+     */
+    @Test
+    public void testDisruptorThreadsCount() {
+        startServer(0, raftServer -> {
+            raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF);
+        });
+
+        Set<Thread> threads = getAllDisruptoCurrentThreads();
+
+        int threadsBefore = threads.size();
+
+        Set<String> threadNamesBefore = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+
+        assertEquals(NodeOptions.DEFAULT_STRIPES * 4/*services*/, threadsBefore, "Started thread names: " + threadNamesBefore);
+
+        servers.forEach(srv -> {
+            for (int i = 0; i < 10; i++)
+                srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF);
+        });
+
+        threads = getAllDisruptoCurrentThreads();
+
+        int threadsAfter = threads.size();
+
+        Set<String> threadNamesAfter = threads.stream().map(Thread::getName).collect(Collectors.toSet());
+
+        threadNamesAfter.removeAll(threadNamesBefore);
+
+        assertEquals(threadsBefore, threadsAfter, "Difference: " + threadNamesAfter);
+    }
+
+    /**
+     * Get a set of Disruptor threads for the well known JRaft services.
+     *
+     * @return Set of Disruptor threads.
+     */
+    @NotNull private Set<Thread> getAllDisruptoCurrentThreads() {
+        return Thread.getAllStackTraces().keySet().stream().filter(t ->
+            t.getName().contains("JRaft-FSMCaller-Disruptor") ||
+                t.getName().contains("JRaft-NodeImpl-Disruptor") ||
+                t.getName().contains("JRaft-ReadOnlyService-Disruptor") ||
+                t.getName().contains("JRaft-LogManager-Disruptor"))
+            .collect(Collectors.toSet());
+    }
+
+    /**
      *
      */
     @Test
@@ -420,7 +470,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
         client1.refreshLeader().get();
         client2.refreshLeader().get();
 
-        NodeImpl leader = servers.stream().map(s -> ((NodeImpl) s.raftGroupService(COUNTER_GROUP_0).getRaftNode())).
+        NodeImpl leader = servers.stream().map(s -> ((NodeImpl)s.raftGroupService(COUNTER_GROUP_0).getRaftNode())).
             filter(n -> n.getState() == STATE_LEADER).findFirst().orElse(null);
 
         assertNotNull(leader);
@@ -611,8 +661,8 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
         org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(groupId);
 
         JRaftServerImpl.DelegatingStateMachine fsm0 =
-            (JRaftServerImpl.DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
+            (JRaftServerImpl.DelegatingStateMachine)svc.getRaftNode().getOptions().getFsm();
 
-        return expected == ((CounterListener) fsm0.getListener()).value();
+        return expected == ((CounterListener)fsm0.getListener()).value();
     }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index 840c6db..b383b26 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -43,12 +43,17 @@ import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
 import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.apache.ignite.raft.jraft.util.JDKMarshaller;
@@ -127,6 +132,38 @@ public class JRaftServerImpl implements RaftServer {
             JRaftUtils.createRequestExecutor(opts)
         );
 
+        if (opts.getfSMCallerExecutorDisruptor() == null) {
+            opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>(
+                "JRaft-FSMCaller-Disruptor",
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new FSMCallerImpl.ApplyTask(),
+                opts.getStripes()));
+        }
+
+        if (opts.getNodeApplyDisruptor() == null) {
+            opts.setNodeApplyDisruptor(new StripedDisruptor<NodeImpl.LogEntryAndClosure>(
+                "JRaft-NodeImpl-Disruptor",
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new NodeImpl.LogEntryAndClosure(),
+                opts.getStripes()));
+        }
+
+        if (opts.getReadOnlyServiceDisruptor() == null) {
+            opts.setReadOnlyServiceDisruptor(new StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>(
+                "JRaft-ReadOnlyService-Disruptor",
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+                opts.getStripes()));
+        }
+
+        if (opts.getLogManagerDisruptor() == null) {
+            opts.setLogManagerDisruptor(new StripedDisruptor<LogManagerImpl.StableClosureEvent>(
+                "JRaft-LogManager-Disruptor",
+                opts.getRaftOptions().getDisruptorBufferSize(),
+                () -> new LogManagerImpl.StableClosureEvent(),
+                opts.getStripes()));
+        }
+
         rpcServer.init(null);
     }
 
@@ -136,6 +173,18 @@ public class JRaftServerImpl implements RaftServer {
             groupService.shutdown();
 
         rpcServer.shutdown();
+
+        if (opts.getfSMCallerExecutorDisruptor() != null)
+            opts.getfSMCallerExecutorDisruptor().shutdown();
+
+        if (opts.getNodeApplyDisruptor() != null)
+            opts.getNodeApplyDisruptor().shutdown();
+
+        if (opts.getReadOnlyServiceDisruptor() != null)
+            opts.getReadOnlyServiceDisruptor().shutdown();
+
+        if (opts.getLogManagerDisruptor() != null)
+            opts.getLogManagerDisruptor().shutdown();
     }
 
     /** {@inheritDoc} */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
index 5492af6..36705b2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftUtils.java
@@ -23,13 +23,17 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
 import org.apache.ignite.raft.jraft.core.Scheduler;
 import org.apache.ignite.raft.jraft.core.TimerManager;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.option.BootstrapOptions;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RpcOptions;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
 import org.apache.ignite.raft.jraft.util.StringUtils;
@@ -49,10 +53,50 @@ public final class JRaftUtils {
      * @return true if bootstrap success
      */
     public static boolean bootstrap(final BootstrapOptions opts) throws InterruptedException {
-        final NodeImpl node = new NodeImpl();
+        final NodeImpl node = new NodeImpl("bootstrap", new PeerId("127.0.0.1", 0));
+
+        NodeOptions nodeOpts = opts.getNodeOptions();
+
+        nodeOpts.setStripes(1);
+
+        StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor;
+        StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor;
+        StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+        StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+        nodeOpts.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>(
+            "JRaft-FSMCaller-Disruptor_bootstrap",
+            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+            () -> new FSMCallerImpl.ApplyTask(),
+            nodeOpts.getStripes()));
+
+        nodeOpts.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>(
+            "JRaft-NodeImpl-Disruptor_bootstrap",
+            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+            () -> new NodeImpl.LogEntryAndClosure(),
+            nodeOpts.getStripes()));
+
+        nodeOpts.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>(
+            "JRaft-ReadOnlyService-Disruptor_bootstrap",
+            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+            () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+            nodeOpts.getStripes()));
+
+        nodeOpts.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>(
+            "JRaft-LogManager-Disruptor_bootstrap",
+            nodeOpts.getRaftOptions().getDisruptorBufferSize(),
+            () -> new LogManagerImpl.StableClosureEvent(),
+            nodeOpts.getStripes()));
+
         final boolean ret = node.bootstrap(opts);
         node.shutdown();
         node.join();
+
+        fsmCallerDusruptor.shutdown();
+        nodeDisruptor.shutdown();
+        readOnlyServiceDisruptor.shutdown();
+        logManagerDisruptor.shutdown();
+
         return ret;
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index f748aa1..4d93213 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -16,18 +16,14 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.FSMCaller;
@@ -40,6 +36,8 @@ import org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
 import org.apache.ignite.raft.jraft.closure.TaskClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
 import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
 import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
@@ -54,10 +52,7 @@ import org.apache.ignite.raft.jraft.option.FSMCallerOptions;
 import org.apache.ignite.raft.jraft.storage.LogManager;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.Requires;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -100,7 +95,10 @@ public class FSMCallerImpl implements FSMCaller {
     /**
      * Apply task for disruptor.
      */
-    private static class ApplyTask {
+    public static class ApplyTask implements GroupAware {
+        /** Raft group id. */
+        String groupId;
+
         TaskType type;
         // union fields
         long committedIndex;
@@ -110,6 +108,11 @@ public class FSMCallerImpl implements FSMCaller {
         Closure done;
         CountDownLatch shutdownLatch;
 
+        /** {@inheritDoc} */
+        @Override public String groupId() {
+            return groupId;
+        }
+
         public void reset() {
             this.type = null;
             this.committedIndex = 0;
@@ -118,14 +121,7 @@ public class FSMCallerImpl implements FSMCaller {
             this.leaderChangeCtx = null;
             this.done = null;
             this.shutdownLatch = null;
-        }
-    }
-
-    private static class ApplyTaskFactory implements EventFactory<ApplyTask> {
-
-        @Override
-        public ApplyTask newInstance() {
-            return new ApplyTask();
+            this.groupId = null;
         }
     }
 
@@ -139,6 +135,9 @@ public class FSMCallerImpl implements FSMCaller {
         }
     }
 
+    /** Raft group id. */
+    String groupId;
+
     private LogManager logManager;
     private StateMachine fsm;
     private ClosureQueue closureQueue;
@@ -149,7 +148,7 @@ public class FSMCallerImpl implements FSMCaller {
     private volatile TaskType currTask;
     private final AtomicLong applyingIndex;
     private volatile RaftException error;
-    private Disruptor<ApplyTask> disruptor;
+    private StripedDisruptor<ApplyTask> disruptor;
     private RingBuffer<ApplyTask> taskQueue;
     private volatile CountDownLatch shutdownLatch;
     private NodeMetrics nodeMetrics;
@@ -165,6 +164,7 @@ public class FSMCallerImpl implements FSMCaller {
 
     @Override
     public boolean init(final FSMCallerOptions opts) {
+        this.groupId = opts.getGroupId();
         this.logManager = opts.getLogManager();
         this.fsm = opts.getFsm();
         this.closureQueue = opts.getClosureQueue();
@@ -174,17 +174,11 @@ public class FSMCallerImpl implements FSMCaller {
         this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
         notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
         this.lastAppliedTerm = opts.getBootstrapId().getTerm();
-        this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() //
-            .setEventFactory(new ApplyTaskFactory()) //
-            .setRingBufferSize(opts.getDisruptorBufferSize()) //
-            .setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-" +
-                node.getOptions().getServerName() + "-" + node.getNodeId().toString(), true)) //
-            .setProducerType(ProducerType.MULTI) //
-            .setWaitStrategy(new BlockingWaitStrategy()) //
-            .build();
-        this.disruptor.handleEventsWith(new ApplyTaskHandler());
-        this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
-        this.taskQueue = this.disruptor.start();
+
+        disruptor = opts.getfSMCallerExecutorDisruptor();
+
+        taskQueue = disruptor.subscribe(groupId, new ApplyTaskHandler());
+
         if (this.nodeMetrics.getMetricRegistry() != null) {
             this.nodeMetrics.getMetricRegistry().register("jraft-fsm-caller-disruptor",
                 new DisruptorMetricSet(this.taskQueue));
@@ -207,6 +201,7 @@ public class FSMCallerImpl implements FSMCaller {
             this.shutdownLatch = latch;
             Utils.runInThread(this.node.getOptions().getCommonExecutor(), () -> this.taskQueue.publishEvent((task, sequence) -> {
                 task.reset();
+                task.groupId = groupId;
                 task.type = TaskType.SHUTDOWN;
                 task.shutdownLatch = latch;
             }));
@@ -225,6 +220,7 @@ public class FSMCallerImpl implements FSMCaller {
             LOG.warn("FSMCaller is stopped, can not apply new task.");
             return false;
         }
+
         if (!this.taskQueue.tryPublishEvent(tpl)) {
             setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
                 "FSMCaller is overload.")));
@@ -236,6 +232,7 @@ public class FSMCallerImpl implements FSMCaller {
     @Override
     public boolean onCommitted(final long committedIndex) {
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.COMMITTED;
             task.committedIndex = committedIndex;
         });
@@ -248,6 +245,7 @@ public class FSMCallerImpl implements FSMCaller {
     void flush() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
         enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.FLUSH;
             task.shutdownLatch = latch;
         });
@@ -257,6 +255,7 @@ public class FSMCallerImpl implements FSMCaller {
     @Override
     public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.SNAPSHOT_LOAD;
             task.done = done;
         });
@@ -265,6 +264,7 @@ public class FSMCallerImpl implements FSMCaller {
     @Override
     public boolean onSnapshotSave(final SaveSnapshotClosure done) {
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.SNAPSHOT_SAVE;
             task.done = done;
         });
@@ -273,6 +273,7 @@ public class FSMCallerImpl implements FSMCaller {
     @Override
     public boolean onLeaderStop(final Status status) {
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.LEADER_STOP;
             task.status = new Status(status);
         });
@@ -281,6 +282,7 @@ public class FSMCallerImpl implements FSMCaller {
     @Override
     public boolean onLeaderStart(final long term) {
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.LEADER_START;
             task.term = term;
         });
@@ -289,6 +291,7 @@ public class FSMCallerImpl implements FSMCaller {
     @Override
     public boolean onStartFollowing(final LeaderChangeContext ctx) {
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.START_FOLLOWING;
             task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
         });
@@ -297,6 +300,7 @@ public class FSMCallerImpl implements FSMCaller {
     @Override
     public boolean onStopFollowing(final LeaderChangeContext ctx) {
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.STOP_FOLLOWING;
             task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
         });
@@ -334,6 +338,7 @@ public class FSMCallerImpl implements FSMCaller {
         }
         final OnErrorClosure c = new OnErrorClosure(error);
         return enqueueTask((task, sequence) -> {
+            task.groupId = groupId;
             task.type = TaskType.ERROR;
             task.done = c;
         });
@@ -348,7 +353,7 @@ public class FSMCallerImpl implements FSMCaller {
     public synchronized void join() throws InterruptedException {
         if (this.shutdownLatch != null) {
             this.shutdownLatch.await();
-            this.disruptor.shutdown();
+            this.disruptor.unsubscribe(groupId);
             if (this.afterShutdown != null) {
                 this.afterShutdown.run(Status.OK());
                 this.afterShutdown = null;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 9d2df35..ed1d0b3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -16,6 +16,9 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -30,13 +33,6 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.jraft.Closure;
@@ -54,6 +50,8 @@ import org.apache.ignite.raft.jraft.closure.SynchronizedClosure;
 import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.Ballot;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
 import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
@@ -106,10 +104,7 @@ import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
 import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.Describer;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.RepeatedTimer;
 import org.apache.ignite.raft.jraft.util.Requires;
@@ -192,7 +187,7 @@ public class NodeImpl implements Node, RaftServerService {
     /**
      * Disruptor to run node service
      */
-    private Disruptor<LogEntryAndClosure> applyDisruptor;
+    private StripedDisruptor<LogEntryAndClosure> applyDisruptor;
     private RingBuffer<LogEntryAndClosure> applyQueue;
 
     /**
@@ -251,13 +246,22 @@ public class NodeImpl implements Node, RaftServerService {
     /**
      * Node service event.
      */
-    private static class LogEntryAndClosure {
+    public static class LogEntryAndClosure implements GroupAware {
+        /** Raft group id. */
+        String groupId;
+
         LogEntry entry;
         Closure done;
         long expectedTerm;
         CountDownLatch shutdownLatch;
 
+        /** {@inheritDoc} */
+        @Override public String groupId() {
+            return groupId;
+        }
+
         public void reset() {
+            this.groupId = null;
             this.entry = null;
             this.done = null;
             this.expectedTerm = 0;
@@ -265,14 +269,6 @@ public class NodeImpl implements Node, RaftServerService {
         }
     }
 
-    private static class LogEntryAndClosureFactory implements EventFactory<LogEntryAndClosure> {
-
-        @Override
-        public LogEntryAndClosure newInstance() {
-            return new LogEntryAndClosure();
-        }
-    }
-
     /**
      * Event handler.
      */
@@ -281,8 +277,7 @@ public class NodeImpl implements Node, RaftServerService {
         private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());
 
         @Override
-        public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
-            throws Exception {
+        public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch) throws Exception {
             if (event.shutdownLatch != null) {
                 if (!this.tasks.isEmpty()) {
                     executeApplyingTasks(this.tasks);
@@ -512,10 +507,6 @@ public class NodeImpl implements Node, RaftServerService {
         }
     }
 
-    public NodeImpl() {
-        this(null, null);
-    }
-
     public NodeImpl(final String groupId, final PeerId serverId) {
         super();
         if (groupId != null) {
@@ -554,14 +545,16 @@ public class NodeImpl implements Node, RaftServerService {
         this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions);
         this.logManager = new LogManagerImpl();
         final LogManagerOptions opts = new LogManagerOptions();
+        opts.setGroupId(groupId);
         opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
         opts.setLogStorage(this.logStorage);
         opts.setConfigurationManager(this.configManager);
         opts.setNode(this);
         opts.setFsmCaller(this.fsmCaller);
         opts.setNodeMetrics(this.metrics);
-        opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
         opts.setRaftOptions(this.raftOptions);
+        opts.setLogManagerDisruptor(options.getLogManagerDisruptor());
+
         return this.logManager.init(opts);
     }
 
@@ -741,8 +734,10 @@ public class NodeImpl implements Node, RaftServerService {
         opts.setClosureQueue(this.closureQueue);
         opts.setNode(this);
         opts.setBootstrapId(bootstrapId);
-        opts.setDisruptorBufferSize(this.raftOptions.getDisruptorBufferSize());
         opts.setRaftMessagesFactory(raftOptions.getRaftMessagesFactory());
+        opts.setfSMCallerExecutorDisruptor(options.getfSMCallerExecutorDisruptor());
+        opts.setGroupId(groupId);
+
         return this.fsmCaller.init(opts);
     }
 
@@ -954,16 +949,10 @@ public class NodeImpl implements Node, RaftServerService {
 
         this.configManager = new ConfigurationManager();
 
-        this.applyDisruptor = DisruptorBuilder.<LogEntryAndClosure>newInstance() //
-            .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
-            .setEventFactory(new LogEntryAndClosureFactory()) //
-            .setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-" + suffix, true)) //
-            .setProducerType(ProducerType.MULTI) //
-            .setWaitStrategy(new BlockingWaitStrategy()) //
-            .build();
-        this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
-        this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
-        this.applyQueue = this.applyDisruptor.start();
+        applyDisruptor = opts.getNodeApplyDisruptor();
+
+        applyQueue = applyDisruptor.subscribe(groupId, new LogEntryAndClosureHandler());
+
         if (this.metrics.getMetricRegistry() != null) {
             this.metrics.getMetricRegistry().register("jraft-node-impl-disruptor",
                 new DisruptorMetricSet(this.applyQueue));
@@ -1044,9 +1033,11 @@ public class NodeImpl implements Node, RaftServerService {
 
         this.readOnlyService = new ReadOnlyServiceImpl();
         final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions();
+        rosOpts.setGroupId(groupId);
         rosOpts.setFsmCaller(this.fsmCaller);
         rosOpts.setNode(this);
         rosOpts.setRaftOptions(this.raftOptions);
+        rosOpts.setReadOnlyServiceDisruptor(opts.getReadOnlyServiceDisruptor());
 
         if (!this.readOnlyService.init(rosOpts)) {
             LOG.error("Fail to init readOnlyService.");
@@ -1603,6 +1594,7 @@ public class NodeImpl implements Node, RaftServerService {
         try {
             final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
                 event.reset();
+                event.groupId = groupId;
                 event.done = task.getDone();
                 event.entry = entry;
                 event.expectedTerm = task.getExpectedTerm();
@@ -2800,7 +2792,10 @@ public class NodeImpl implements Node, RaftServerService {
                     this.shutdownLatch = latch;
 
                     Utils.runInThread(this.getOptions().getCommonExecutor(),
-                        () -> this.applyQueue.publishEvent((event, sequence) -> event.shutdownLatch = latch));
+                        () -> this.applyQueue.publishEvent((event, sequence) -> {
+                            event.groupId = groupId;
+                            event.shutdownLatch = latch;
+                        }));
                 }
                 if (this.timerManager != null) {
                     this.timerManager.shutdown();
@@ -2875,7 +2870,7 @@ public class NodeImpl implements Node, RaftServerService {
                 Replicator.join(this.wakingCandidate);
             }
             this.shutdownLatch.await();
-            this.applyDisruptor.shutdown();
+            this.applyDisruptor.unsubscribe(groupId);
             this.shutdownLatch = null;
         }
         if (this.fsmCaller != null) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index 8da944a..31e6f56 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -16,6 +16,10 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -25,19 +29,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import com.lmax.disruptor.BlockingWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.FSMCaller.LastAppliedLogIndexListener;
 import org.apache.ignite.raft.jraft.ReadOnlyService;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.ReadIndexState;
 import org.apache.ignite.raft.jraft.entity.ReadIndexStatus;
 import org.apache.ignite.raft.jraft.error.RaftError;
@@ -50,10 +49,7 @@ import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcResponseClosureAdapter;
 import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.Bytes;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
 import org.apache.ignite.raft.jraft.util.ThreadHelper;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -64,10 +60,13 @@ import org.apache.ignite.raft.jraft.util.Utils;
 public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {
     private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
 
+    /** RAFT group id. */
+    private String groupId;
+
     /**
      * Disruptor to run readonly service.
      */
-    private Disruptor<ReadIndexEvent> readIndexDisruptor;
+    private StripedDisruptor<ReadIndexEvent> readIndexDisruptor;
 
     private RingBuffer<ReadIndexEvent> readIndexQueue;
 
@@ -90,11 +89,19 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
 
     private static final IgniteLogger LOG = IgniteLogger.forClass(ReadOnlyServiceImpl.class);
 
-    private static class ReadIndexEvent {
+    public static class ReadIndexEvent implements GroupAware {
+        /** Raft group id. */
+        String groupId;
+
         Bytes requestContext;
         ReadIndexClosure done;
         CountDownLatch shutdownLatch;
         long startTime;
+
+        /** {@inheritDoc} */
+        @Override public String groupId() {
+            return groupId;
+        }
     }
 
     private static class ReadIndexEventFactory implements EventFactory<ReadIndexEvent> {
@@ -241,23 +248,16 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
 
     @Override
     public boolean init(final ReadOnlyServiceOptions opts) {
+        this.groupId = opts.getGroupId();
         this.node = opts.getNode();
         this.nodeMetrics = this.node.getNodeMetrics();
         this.fsmCaller = opts.getFsmCaller();
         this.raftOptions = opts.getRaftOptions();
 
-        this.readIndexDisruptor = DisruptorBuilder.<ReadIndexEvent>newInstance() //
-            .setEventFactory(new ReadIndexEventFactory()) //
-            .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) //
-            .setThreadFactory(new NamedThreadFactory("JRaft-ReadOnlyService-Disruptor-" +
-                node.getOptions().getServerName() + "-" + node.getNodeId().toString(), true)) //
-            .setWaitStrategy(new BlockingWaitStrategy()) //
-            .setProducerType(ProducerType.MULTI) //
-            .build();
-        this.readIndexDisruptor.handleEventsWith(new ReadIndexEventHandler());
-        this.readIndexDisruptor
-            .setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
-        this.readIndexQueue = this.readIndexDisruptor.start();
+        readIndexDisruptor = opts.getReadOnlyServiceDisruptor();
+
+        readIndexQueue = readIndexDisruptor.subscribe(groupId, new ReadIndexEventHandler());
+
         if (this.nodeMetrics.getMetricRegistry() != null) {
             this.nodeMetrics.getMetricRegistry() //
                 .register("jraft-read-only-service-disruptor", new DisruptorMetricSet(this.readIndexQueue));
@@ -285,7 +285,10 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
         }
         this.shutdownLatch = new CountDownLatch(1);
         Utils.runInThread(this.node.getOptions().getCommonExecutor(),
-            () -> this.readIndexQueue.publishEvent((event, sequence) -> event.shutdownLatch = this.shutdownLatch));
+            () -> this.readIndexQueue.publishEvent((event, sequence) -> {
+                event.groupId = this.groupId;
+                event.shutdownLatch = this.shutdownLatch;
+            }));
     }
 
     @Override
@@ -293,7 +296,7 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
         if (this.shutdownLatch != null) {
             this.shutdownLatch.await();
         }
-        this.readIndexDisruptor.shutdown();
+        this.readIndexDisruptor.unsubscribe(groupId);
         resetPendingStatusError(new Status(RaftError.ESTOP, "Node is quit."));
     }
 
@@ -306,6 +309,7 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
 
         try {
             EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
+                event.groupId = this.groupId;
                 event.done = closure;
                 event.requestContext = new Bytes(reqCtx);
                 event.startTime = Utils.monotonicMs();
@@ -388,7 +392,10 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
     @OnlyForTest
     void flush() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
-        this.readIndexQueue.publishEvent((task, sequence) -> task.shutdownLatch = latch);
+        this.readIndexQueue.publishEvent((task, sequence) -> {
+            task.groupId = this.groupId;
+            task.shutdownLatch = latch;
+        });
         latch.await();
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/DisruptorBuilder.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
similarity index 91%
rename from modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/DisruptorBuilder.java
rename to modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
index ddf8bb9..79aaa35 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/DisruptorBuilder.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/DisruptorBuilder.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.raft.jraft.util;
+package org.apache.ignite.raft.jraft.disruptor;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventFactory;
@@ -22,6 +22,8 @@ import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 import java.util.concurrent.ThreadFactory;
+import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
+import org.apache.ignite.raft.jraft.util.Requires;
 
 /**
  * A builder to build a disruptor instance.
@@ -59,10 +61,10 @@ public class DisruptorBuilder<T> {
     }
 
     public ThreadFactory getThreadFactory() {
-        return this.threadFactory;
+        return threadFactory;
     }
 
-    public DisruptorBuilder<T> setThreadFactory(final ThreadFactory threadFactory) {
+    public DisruptorBuilder<T> setThreadFactory(ThreadFactory threadFactory) {
         this.threadFactory = threadFactory;
         return this;
     }
@@ -88,6 +90,7 @@ public class DisruptorBuilder<T> {
     public Disruptor<T> build() {
         Requires.requireNonNull(this.ringBufferSize, " Ring buffer size not set");
         Requires.requireNonNull(this.eventFactory, "Event factory not set");
+
         return new Disruptor<>(this.eventFactory, this.ringBufferSize, this.threadFactory, this.producerType,
             this.waitStrategy);
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/GroupAware.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/GroupAware.java
new file mode 100644
index 0000000..542b3f7
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/GroupAware.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.disruptor;
+
+/**
+ * Interface provides group id.
+ * It allows to determine a stripe in Striped disruptor.
+ */
+public interface GroupAware {
+    /**
+     * Gets a group id.
+     *
+     * @return Group id.
+     */
+    String groupId();
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
new file mode 100644
index 0000000..40f99e8
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.disruptor;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ignite.lang.LoggerMessageHelper.format;
+
+/**
+ * Stripe Disruptor is a set of queues which process several independent groups in one queue (in the stripe).
+ * It makes fewer threads that the groups and gives the same sequential guaranties and a close performance.
+ *
+ * @param <T> Event type. This event should implement {@link GroupAware} interface.
+ */
+public class StripedDisruptor<T extends GroupAware> {
+    /** The logger. */
+    private static final Logger LOG = LoggerFactory.getLogger(StripedDisruptor.class);
+
+    /** Array of disruptors. Each Disruptor in the appropriate stripe. */
+    private final Disruptor<T>[] disruptors;
+
+    /** Array of Ring buffer. It placed according to disruptors in the array. */
+    private final RingBuffer<T>[] queues;
+
+    /** Disruptor event handler array. It placed according to disruptors in the array.*/
+    private final ArrayList<StripeEntryHandler> eventHandlers;
+
+    /** Disruptor error handler array. It placed according to disruptors in the array.*/
+    private final ArrayList<StripeExceptionHandler> exceptionHandlers;
+
+    /** Amount of stripes. */
+    private final int stripes;
+
+    /** The Striped disruptor name. */
+    private final String name;
+
+    /**
+     * @param name Name of the Striped disruptor.
+     * @param bufferSize Buffer size for each Disruptor.
+     * @param eventFactory Event factory for the Striped disruptor.
+     * @param stripes Amount of stripes.
+     */
+    public StripedDisruptor(String name, int bufferSize, EventFactory<T> eventFactory, int stripes) {
+        disruptors = new Disruptor[stripes];
+        queues = new RingBuffer[stripes];
+        eventHandlers = new ArrayList<>(stripes);
+        exceptionHandlers = new ArrayList<>(stripes);
+        this.stripes = stripes;
+        this.name = name;
+
+        for (int i = 0; i < stripes; i++) {
+            String stripeName = format("{}_stripe_{}-", name, i);
+
+            Disruptor<T> disruptor = DisruptorBuilder.<T>newInstance()
+                .setRingBufferSize(bufferSize)
+                .setEventFactory(eventFactory)
+                .setThreadFactory(new NamedThreadFactory(stripeName, true))
+                .setProducerType(ProducerType.MULTI)
+                .setWaitStrategy(new BlockingWaitStrategy())
+                .build();
+
+            eventHandlers.add(new StripeEntryHandler());
+            exceptionHandlers.add(new StripeExceptionHandler(name));
+
+            disruptor.handleEventsWith(eventHandlers.get(i));
+            disruptor.setDefaultExceptionHandler(exceptionHandlers.get(i));
+
+            queues[i] = disruptor.start();
+            disruptors[i] = disruptor;
+        }
+
+        LOG.info("Striped disruptor was started [name={}]", name);
+    }
+
+    /**
+     * Shutdowns all nested disruptors.
+     */
+    public void shutdown() {
+        for (int i = 0; i < stripes; i++)
+            disruptors[i].shutdown();
+
+        LOG.info("Striped disruptor stopped [name={}]", name);
+    }
+
+    /**
+     * Subscribes an event handler to one stripe of the Striped disruptor.
+     * The stripe is determined by a group id.
+     *
+     * @param group Group id.
+     * @param handler Event handler for the group specified.
+     * @return Disruptor queue appropriate to the group.
+     */
+    public RingBuffer<T> subscribe(String group, EventHandler<T> handler) {
+        return subscribe(group, handler, null);
+    }
+
+    /**
+     * Subscribes an event handler and a exception handler to one stripe of the Striped disruptor.
+     * The stripe is determined by a group id.
+     *
+     * @param group Group id.
+     * @param handler Event handler for the group specified.
+     * @param exceptionHandler Exception handler for the group specified.
+     * @return Disruptor queue appropriate to the group.
+     */
+    public RingBuffer<T> subscribe(String group, EventHandler<T> handler, BiConsumer<T, Throwable> exceptionHandler) {
+        eventHandlers.get(getStripe(group)).subscribe(group, handler);
+
+        if (exceptionHandler != null)
+            exceptionHandlers.get(getStripe(group)).subscribe(group, exceptionHandler);
+
+        LOG.info("Consumer subscribed [poolName={}, group={}]", name, group);
+
+        return queues[getStripe(group)];
+    }
+
+    /**
+     * Unsubscribes group for the Striped disruptor.
+     *
+     * @param group Group id.
+     */
+    public void unsubscribe(String group) {
+        eventHandlers.get(getStripe(group)).unsubscribe(group);
+        exceptionHandlers.get(getStripe(group)).unsubscribe(group);
+
+        LOG.info("Consumer unsubscribe [poolName={}, group={}]", name, group);
+    }
+
+    /**
+     * Determines a stripe by a group id and returns a stripe number.
+     *
+     * @param group Group id.
+     * @return Stripe of the Striped disruptor.
+     */
+    private int getStripe(String group) {
+        return Math.abs(group.hashCode() % stripes);
+    }
+
+    /**
+     * Determines a Disruptor queue by a group id.
+     *
+     * @param groupId Group id.
+     * @return Disruptor queue appropriate to the group.
+     */
+    public RingBuffer<T> queue(String groupId) {
+        return queues[getStripe(groupId)];
+    }
+
+    /**
+     * Event handler for stripe of the Striped disruptor.
+     * It routs an event to the event handler for a group.
+     */
+    private class StripeEntryHandler implements EventHandler<T> {
+        private final ConcurrentHashMap<String, EventHandler<T>> subscrivers;
+
+        /**
+         * The constructor.
+         */
+        StripeEntryHandler() {
+            subscrivers = new ConcurrentHashMap<>();
+        }
+
+        /**
+         * Subscribes a group to appropriate events for it.
+         *
+         * @param group Group id.
+         * @param handler Event handler for the group specified.
+         */
+        void subscribe(String group, EventHandler<T> handler) {
+            subscrivers.put(group, handler);
+        }
+
+        /**
+         * Unsubscribes a group for any event.
+         *
+         * @param group Group id.
+         */
+        void unsubscribe(String group) {
+            subscrivers.remove(group);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onEvent(T event, long sequence, boolean endOfBatch) throws Exception {
+            EventHandler<T> handler = subscrivers.get(event.groupId());
+
+            assert handler != null : format("Group of the event is unsupported [group={}, event={}]", event.groupId(), event);
+
+            handler.onEvent(event, sequence, endOfBatch);
+        }
+    }
+
+    /**
+     * Striped disruptor exxception handler.
+     * It prints into log when an exception has occurred and route it to the handler for group.
+     */
+    private class StripeExceptionHandler implements ExceptionHandler<T> {
+        /** Name of the Disruptor instance. */
+        private final String name;
+
+        /** There are exception handlers per group. */
+        private final ConcurrentHashMap<String, BiConsumer<T, Throwable>> subscrivers;
+
+        /**
+         * @param name Name of the Disruptor instance.
+         */
+        StripeExceptionHandler(String name) {
+            this.name = name;
+            this.subscrivers = new ConcurrentHashMap<>();
+        }
+
+        /**
+         * Subscribes a group to an exception, that might happen during handling an event for the group.
+         *
+         * @param group Group id.
+         * @param handler Exception handler.
+         */
+        void subscribe(String group, BiConsumer<T, Throwable> handler) {
+            subscrivers.put(group, handler);
+        }
+
+        /**
+         * Unsubscribes a group for any exception.
+         *
+         * @param group Group id.
+         */
+        void unsubscribe(String group) {
+            subscrivers.remove(group);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void handleOnStartException(Throwable ex) {
+            LOG.error("Fail to start disruptor [name={}]", name, ex);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void handleOnShutdownException(Throwable ex) {
+            LOG.error("Fail to shutdown disruptor [name={}]", name, ex);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void handleEventException(Throwable ex, long sequence, T event) {
+            BiConsumer<T, Throwable> handler = subscrivers.get(event.groupId());
+
+            LOG.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", name, event, handler != null, ex);
+
+            if (handler != null)
+                handler.accept(event, ex);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return format("{} [name={}]", StripedDisruptor.class.getSimpleName(), name);
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
index 913b5a2..dce9098 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/FSMCallerOptions.java
@@ -20,7 +20,9 @@ import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.StateMachine;
 import org.apache.ignite.raft.jraft.closure.ClosureQueue;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.LogId;
 import org.apache.ignite.raft.jraft.storage.LogManager;
 
@@ -28,6 +30,9 @@ import org.apache.ignite.raft.jraft.storage.LogManager;
  * FSM caller options.
  */
 public class FSMCallerOptions {
+    /** Raft group id. */
+    private String groupId;
+
     private LogManager logManager;
     private StateMachine fsm;
     private Closure afterShutdown;
@@ -35,17 +40,22 @@ public class FSMCallerOptions {
     private ClosureQueue closureQueue;
     private NodeImpl node;
     private RaftMessagesFactory raftMessagesFactory;
-    /**
-     * disruptor buffer size.
-     */
-    private int disruptorBufferSize = 1024;
+    private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
 
-    public int getDisruptorBufferSize() {
-        return this.disruptorBufferSize;
+    public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
+        return fSMCallerExecutorDisruptor;
     }
 
-    public void setDisruptorBufferSize(int disruptorBufferSize) {
-        this.disruptorBufferSize = disruptorBufferSize;
+    public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
+        this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
     }
 
     public NodeImpl getNode() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
index d22d317..32e317e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/LogManagerOptions.java
@@ -20,22 +20,43 @@ import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
 import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 
 /**
  * Options for log manager.
  */
 public class LogManagerOptions {
+    /** Raft group id. */
+    private String groupId;
+
     private Node node;
     private LogStorage logStorage;
     private ConfigurationManager configurationManager;
     private FSMCaller fsmCaller;
-    private int disruptorBufferSize = 1024;
     private RaftOptions raftOptions;
     private NodeMetrics nodeMetrics;
     private LogEntryCodecFactory logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
+    private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
+        return logManagerDisruptor;
+    }
+
+    public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
+        this.logManagerDisruptor = logManagerDisruptor;
+    }
 
     public LogEntryCodecFactory getLogEntryCodecFactory() {
         return this.logEntryCodecFactory;
@@ -61,14 +82,6 @@ public class LogManagerOptions {
         this.raftOptions = raftOptions;
     }
 
-    public int getDisruptorBufferSize() {
-        return this.disruptorBufferSize;
-    }
-
-    public void setDisruptorBufferSize(final int disruptorBufferSize) {
-        this.disruptorBufferSize = disruptorBufferSize;
-    }
-
     public LogStorage getLogStorage() {
         return this.logStorage;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 570a096..94c68e7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -35,6 +35,9 @@ import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
  * Node options.
  */
 public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
+    /** This value is used by default to determine the count of stripes in the striped queue. */
+    public static final int DEFAULT_STRIPES = Utils.cpus() * 2;
+
     // A follower would become a candidate if it doesn't receive any message
     // from the leader in |election_timeout_ms| milliseconds
     // Default: 1000 (1s)
@@ -198,12 +201,29 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     /** Server name. */
     private String serverName;
 
+    /** Amount of Disruptors that will handle the RAFT server. */
+    private int stripes = DEFAULT_STRIPES;
+
     public NodeOptions() {
         raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
         raftOptions.setRaftClientMessagesFactory(getRaftClientMessagesFactory());
     }
 
     /**
+     * @return Stripe count.
+     */
+    public int getStripes() {
+        return stripes;
+    }
+
+    /**
+     * @param stripes Stripe count.
+     */
+    public void setStripes(int stripes) {
+        this.stripes = stripes;
+    }
+
+    /**
      * The rpc client.
      */
     public JRaftServiceFactory getServiceFactory() {
@@ -507,6 +527,10 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         nodeOptions.setServerName(this.getServerName());
         nodeOptions.setScheduler(this.getScheduler());
         nodeOptions.setClientExecutor(this.getClientExecutor());
+        nodeOptions.setNodeApplyDisruptor(this.getNodeApplyDisruptor());
+        nodeOptions.setfSMCallerExecutorDisruptor(this.getfSMCallerExecutorDisruptor());
+        nodeOptions.setReadOnlyServiceDisruptor(this.getReadOnlyServiceDisruptor());
+        nodeOptions.setLogManagerDisruptor(this.getLogManagerDisruptor());
 
         return nodeOptions;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java
index aa963c3..cb1ea95 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ReadOnlyServiceOptions.java
@@ -18,15 +18,36 @@ package org.apache.ignite.raft.jraft.option;
 
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 
 /**
  * Read-Only service options.
  */
 public class ReadOnlyServiceOptions {
+    /** Raft group id. */
+    private String groupId;
 
     private RaftOptions raftOptions;
     private NodeImpl node;
     private FSMCaller fsmCaller;
+    private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
+        return readOnlyServiceDisruptor;
+    }
+
+    public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
+        this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
+    }
 
     public NodeImpl getNode() {
         return node;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
index 5dc3fb4..b1af8a1 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RpcOptions.java
@@ -16,11 +16,16 @@
  */
 package org.apache.ignite.raft.jraft.option;
 
-import java.util.concurrent.ExecutorService;
 import com.codahale.metrics.MetricRegistry;
+import java.util.concurrent.ExecutorService;
 import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
+import org.apache.ignite.raft.jraft.core.NodeImpl;
+import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.rpc.RpcClient;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 
 public class RpcOptions {
     /** Raft message factory. */
@@ -64,6 +69,50 @@ public class RpcOptions {
      */
     private ExecutorService clientExecutor;
 
+    /** Striped disruptor for FSMCaller service. The queue serves of an Append entry requests in the RAFT state machine. */
+    private StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor;
+
+    /** Striped disruptor for Node apply service. */
+    private StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor;
+
+    /** Striped disruptor for Read only service. */
+    private StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor;
+
+    /** Striped disruptor for Log manager service. */
+    private StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor;
+
+    public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
+        return fSMCallerExecutorDisruptor;
+    }
+
+    public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
+        this.fSMCallerExecutorDisruptor = fSMCallerExecutorDisruptor;
+    }
+
+    public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
+        return nodeApplyDisruptor;
+    }
+
+    public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
+        this.nodeApplyDisruptor = nodeApplyDisruptor;
+    }
+
+    public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
+        return readOnlyServiceDisruptor;
+    }
+
+    public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
+        this.readOnlyServiceDisruptor = readOnlyServiceDisruptor;
+    }
+
+    public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
+        return logManagerDisruptor;
+    }
+
+    public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
+        this.logManagerDisruptor = logManagerDisruptor;
+    }
+
     /**
      * Metric registry for RPC services, user should not use this field.
      */
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index c3beacf..16f2b3a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -16,23 +16,18 @@
  */
 package org.apache.ignite.raft.jraft.storage.impl;
 
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.EventTranslator;
+import com.lmax.disruptor.RingBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.EventTranslator;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.dsl.Disruptor;
-import com.lmax.disruptor.dsl.ProducerType;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.raft.jraft.FSMCaller;
 import org.apache.ignite.raft.jraft.Status;
@@ -40,6 +35,8 @@ import org.apache.ignite.raft.jraft.conf.Configuration;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.GroupAware;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
 import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
@@ -56,10 +53,7 @@ import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogManager;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.raft.jraft.util.ArrayDeque;
-import org.apache.ignite.raft.jraft.util.DisruptorBuilder;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
-import org.apache.ignite.raft.jraft.util.LogExceptionHandler;
-import org.apache.ignite.raft.jraft.util.NamedThreadFactory;
 import org.apache.ignite.raft.jraft.util.Requires;
 import org.apache.ignite.raft.jraft.util.SegmentList;
 import org.apache.ignite.raft.jraft.util.ThreadHelper;
@@ -73,6 +67,9 @@ public class LogManagerImpl implements LogManager {
 
     private static final IgniteLogger LOG = IgniteLogger.forClass(LogManagerImpl.class);
 
+    /** Raft group id. */
+    String groupId;
+
     private LogStorage logStorage;
     private ConfigurationManager configManager;
     private FSMCaller fsmCaller;
@@ -89,7 +86,7 @@ public class LogManagerImpl implements LogManager {
     private volatile long lastLogIndex;
     private volatile LogId lastSnapshotId = new LogId(0, 0);
     private final Map<Long, WaitMeta> waitMap = new HashMap<>();
-    private Disruptor<StableClosureEvent> disruptor;
+    private StripedDisruptor<StableClosureEvent> disruptor;
     private RingBuffer<StableClosureEvent> diskQueue;
     private RaftOptions raftOptions;
     private volatile CountDownLatch shutDownLatch;
@@ -106,23 +103,25 @@ public class LogManagerImpl implements LogManager {
         LAST_LOG_ID // get last log id
     }
 
-    private static class StableClosureEvent {
+    public static class StableClosureEvent implements GroupAware {
+        /** Raft group id. */
+        String groupId;
+
         StableClosure done;
         EventType type;
 
+        /** {@inheritDoc} */
+        @Override public String groupId() {
+            return groupId;
+        }
+
         void reset() {
+            this.groupId = null;
             this.done = null;
             this.type = null;
         }
     }
 
-    private static class StableClosureEventFactory implements EventFactory<StableClosureEvent> {
-        @Override
-        public StableClosureEvent newInstance() {
-            return new StableClosureEvent();
-        }
-    }
-
     /**
      * Waiter metadata
      */
@@ -175,6 +174,7 @@ public class LogManagerImpl implements LogManager {
             this.logStorage = opts.getLogStorage();
             this.configManager = opts.getConfigurationManager();
             this.nodeOptions = opts.getNode().getOptions();
+            this.groupId = opts.getGroupId();
 
             LogStorageOptions lsOpts = new LogStorageOptions();
             lsOpts.setConfigurationManager(this.configManager);
@@ -188,22 +188,11 @@ public class LogManagerImpl implements LogManager {
             this.lastLogIndex = this.logStorage.getLastLogIndex();
             this.diskId = new LogId(this.lastLogIndex, getTermFromLogStorage(this.lastLogIndex));
             this.fsmCaller = opts.getFsmCaller();
-            this.disruptor = DisruptorBuilder.<StableClosureEvent>newInstance() //
-                .setEventFactory(new StableClosureEventFactory()) //
-                .setRingBufferSize(opts.getDisruptorBufferSize()) //
-                .setThreadFactory(new NamedThreadFactory("JRaft-LogManager-Disruptor-" +
-                    opts.getNode().getOptions().getServerName() + "-" + opts.getNode().getNodeId(), true)) //
-                .setProducerType(ProducerType.MULTI) //
-                /*
-                 *  Use timeout strategy in log manager. If timeout happens, it will called reportError to halt the node.
-                 */
-                .setWaitStrategy(new TimeoutBlockingWaitStrategy(
-                    this.raftOptions.getDisruptorPublishEventWaitTimeoutSecs(), TimeUnit.SECONDS)) //
-                .build();
-            this.disruptor.handleEventsWith(new StableClosureEventHandler());
-            this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(this.getClass().getSimpleName(),
-                (event, ex) -> reportError(-1, "LogManager handle event error")));
-            this.diskQueue = this.disruptor.start();
+            this.disruptor = opts.getLogManagerDisruptor();
+
+            this.diskQueue = disruptor.subscribe(groupId, new StableClosureEventHandler(),
+                (event, ex) -> reportError(-1, "LogManager handle event error"));
+
             if (this.nodeMetrics.getMetricRegistry() != null) {
                 this.nodeMetrics.getMetricRegistry().register("jraft-log-manager-disruptor",
                     new DisruptorMetricSet(this.diskQueue));
@@ -219,6 +208,7 @@ public class LogManagerImpl implements LogManager {
         this.shutDownLatch = new CountDownLatch(1);
         Utils.runInThread(nodeOptions.getCommonExecutor(), () -> this.diskQueue.publishEvent((event, sequence) -> {
             event.reset();
+            event.groupId = groupId;
             event.type = EventType.SHUTDOWN;
         }));
     }
@@ -229,7 +219,7 @@ public class LogManagerImpl implements LogManager {
             return;
         }
         this.shutDownLatch.await();
-        this.disruptor.shutdown();
+        this.disruptor.unsubscribe(groupId);
     }
 
     @Override
@@ -328,6 +318,7 @@ public class LogManagerImpl implements LogManager {
             int retryTimes = 0;
             final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
                 event.reset();
+                event.groupId = groupId;
                 event.type = EventType.OTHER;
                 event.done = done;
             };
@@ -363,6 +354,7 @@ public class LogManagerImpl implements LogManager {
         }
         if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
             event.reset();
+            event.groupId = groupId;
             event.type = type;
             event.done = done;
         })) {
@@ -376,6 +368,7 @@ public class LogManagerImpl implements LogManager {
             Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
             return true;
         }
+
         return this.diskQueue.tryPublishEvent(translator);
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/LogExceptionHandler.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/LogExceptionHandler.java
deleted file mode 100644
index 0b9b4b6..0000000
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/LogExceptionHandler.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.raft.jraft.util;
-
-import com.lmax.disruptor.ExceptionHandler;
-import org.apache.ignite.lang.IgniteLogger;
-
-/**
- * Disruptor exception handler.
- */
-public final class LogExceptionHandler<T> implements ExceptionHandler<T> {
-
-    private static final IgniteLogger LOG = IgniteLogger.forClass(LogExceptionHandler.class);
-
-    public interface OnEventException<T> {
-
-        void onException(T event, Throwable ex);
-    }
-
-    private final String name;
-    private final OnEventException<T> onEventException;
-
-    public LogExceptionHandler(String name) {
-        this(name, null);
-    }
-
-    public LogExceptionHandler(String name, OnEventException<T> onEventException) {
-        this.name = name;
-        this.onEventException = onEventException;
-    }
-
-    @Override
-    public void handleOnStartException(Throwable ex) {
-        LOG.error("Fail to start {} disruptor", ex, this.name);
-    }
-
-    @Override
-    public void handleOnShutdownException(Throwable ex) {
-        LOG.error("Fail to shutdown {}r disruptor", ex, this.name);
-
-    }
-
-    @Override
-    public void handleEventException(Throwable ex, long sequence, T event) {
-        LOG.error("Handle {} disruptor event error, event is {}", ex, this.name, event);
-        if (this.onEventException != null) {
-            this.onEventException.onException(event, ex);
-        }
-    }
-}
\ No newline at end of file
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index ae0bd6a..b0d56ad 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -25,13 +25,12 @@ import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.closure.ClosureQueueImpl;
 import org.apache.ignite.raft.jraft.closure.LoadSnapshotClosure;
 import org.apache.ignite.raft.jraft.closure.SaveSnapshotClosure;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
 import org.apache.ignite.raft.jraft.entity.EnumOutter.ErrorType;
 import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
 import org.apache.ignite.raft.jraft.entity.LogId;
-import org.apache.ignite.raft.jraft.entity.NodeId;
-import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.error.RaftException;
@@ -67,6 +66,9 @@ public class FSMCallerTest {
     private LogManager logManager;
     private ClosureQueueImpl closureQueue;
 
+    /** Disruptor for this service test. */
+    private StripedDisruptor disruptor;
+
     @BeforeEach
     public void setup() {
         this.fsmCaller = new FSMCallerImpl();
@@ -75,7 +77,6 @@ public class FSMCallerTest {
         this.closureQueue = new ClosureQueueImpl(options);
         opts = new FSMCallerOptions();
         Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
-        Mockito.when(this.node.getNodeId()).thenReturn(new NodeId("test", new PeerId("localhost", 8082)));
         Mockito.when(this.node.getOptions()).thenReturn(options);
         opts.setNode(this.node);
         opts.setFsm(this.fsm);
@@ -83,6 +84,11 @@ public class FSMCallerTest {
         opts.setBootstrapId(new LogId(10, 1));
         opts.setClosureQueue(this.closureQueue);
         opts.setRaftMessagesFactory(new RaftMessagesFactory());
+        opts.setGroupId("TestSrv");
+        opts.setfSMCallerExecutorDisruptor(disruptor = new StripedDisruptor<>("TestFSMDisruptor",
+            1024,
+            () -> new FSMCallerImpl.ApplyTask(),
+            1));
         assertTrue(this.fsmCaller.init(opts));
     }
 
@@ -91,6 +97,7 @@ public class FSMCallerTest {
         if (this.fsmCaller != null) {
             this.fsmCaller.shutdown();
             this.fsmCaller.join();
+            disruptor.shutdown();
         }
     }
 
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index 2586420..53d9f98 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.raft.jraft.JRaftUtils;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.ReadIndexState;
@@ -67,6 +68,9 @@ public class ReadOnlyServiceTest {
     @Mock
     private FSMCaller fsmCaller;
 
+    /** Disruptor for this service test. */
+    private StripedDisruptor disruptor;
+
     @BeforeEach
     public void setup() {
         this.readOnlyServiceImpl = new ReadOnlyServiceImpl();
@@ -76,6 +80,11 @@ public class ReadOnlyServiceTest {
         opts.setFsmCaller(this.fsmCaller);
         opts.setNode(this.node);
         opts.setRaftOptions(raftOptions);
+        opts.setGroupId("TestSrv");
+        opts.setReadOnlyServiceDisruptor(disruptor = new StripedDisruptor<>("TestReadOnlyServiceDisruptor",
+            1024,
+            () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+            1));
         NodeOptions nodeOptions = new NodeOptions();
         nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
         nodeOptions.setClientExecutor(JRaftUtils.createClientExecutor(nodeOptions, "unittest"));
@@ -93,6 +102,7 @@ public class ReadOnlyServiceTest {
     public void teardown() throws Exception {
         this.readOnlyServiceImpl.shutdown();
         this.readOnlyServiceImpl.join();
+        disruptor.shutdown();
     }
 
     @Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 868d4f4..369b6b8 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -46,12 +47,14 @@ import org.apache.ignite.raft.jraft.Node;
 import org.apache.ignite.raft.jraft.NodeManager;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer;
 import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
 import org.apache.ignite.raft.jraft.storage.SnapshotThrottle;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
 import org.apache.ignite.raft.jraft.test.TestUtils;
 import org.apache.ignite.raft.jraft.util.Endpoint;
 import org.jetbrains.annotations.Nullable;
@@ -85,6 +88,26 @@ public class TestCluster {
     private final Lock lock = new ReentrantLock();
     private final Consumer<NodeOptions> optsClo;
 
+    /**
+     * These disruptors will be used for all RAFT servers in the cluster.
+     */
+    private final HashMap<Endpoint, StripedDisruptor<FSMCallerImpl.ApplyTask>> fsmCallerDusruptors = new HashMap<>();
+
+    /**
+     * These disruptors will be used for all RAFT servers in the cluster.
+     */
+    private final HashMap<Endpoint, StripedDisruptor<NodeImpl.LogEntryAndClosure>> nodeDisruptors = new HashMap<>();
+
+    /**
+     * These disruptors will be used for all RAFT servers in the cluster.
+     */
+    private final HashMap<Endpoint, StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent>> readOnlyServiceDisruptors = new HashMap<>();
+
+    /**
+     * These disruptors will be used for all RAFT servers in the cluster.
+     */
+    private final HashMap<Endpoint, StripedDisruptor<LogManagerImpl.StableClosureEvent>> logManagerDisruptors = new HashMap<>();
+
     private JRaftServiceFactory raftServiceFactory = new TestJRaftServiceFactory();
 
     private LinkedHashSet<PeerId> learners;
@@ -206,6 +229,30 @@ public class TestCluster {
             nodeOptions.setSnapshotUri(serverDataPath + File.separator + "snapshot");
             nodeOptions.setElectionPriority(priority);
 
+            nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+                "JRaft-FSMCaller-Disruptor_TestCluster",
+                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+                () -> new FSMCallerImpl.ApplyTask(),
+                nodeOptions.getStripes())));
+
+            nodeOptions.setNodeApplyDisruptor(nodeDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+                "JRaft-NodeImpl-Disruptor_TestCluster",
+                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+                () -> new NodeImpl.LogEntryAndClosure(),
+                nodeOptions.getStripes())));
+
+            nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+                "JRaft-ReadOnlyService-Disruptor_TestCluster",
+                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+                () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
+                nodeOptions.getStripes())));
+
+            nodeOptions.setLogManagerDisruptor(logManagerDisruptors.computeIfAbsent(listenAddr, endpoint -> new StripedDisruptor<>(
+                "JRaft-LogManager-Disruptor_TestCluster",
+                nodeOptions.getRaftOptions().getDisruptorBufferSize(),
+                () -> new LogManagerImpl.StableClosureEvent(),
+                nodeOptions.getStripes())));
+
             final MockStateMachine fsm = new MockStateMachine(listenAddr);
             nodeOptions.setFsm(fsm);
 
@@ -319,6 +366,11 @@ public class TestCluster {
         final List<Endpoint> addrs = getAllNodes();
         for (final Endpoint addr : addrs)
             stop(addr);
+
+        fsmCallerDusruptors.values().forEach(StripedDisruptor::shutdown);
+        nodeDisruptors.values().forEach(StripedDisruptor::shutdown);
+        readOnlyServiceDisruptors.values().forEach(StripedDisruptor::shutdown);
+        logManagerDisruptors.values().forEach(StripedDisruptor::shutdown);
     }
 
     public void clean(final Endpoint listenAddr) {
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index ab8211a..04c36df 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -26,11 +26,10 @@ import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
 import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
 import org.apache.ignite.raft.jraft.core.NodeMetrics;
+import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.EnumOutter;
 import org.apache.ignite.raft.jraft.entity.LogEntry;
 import org.apache.ignite.raft.jraft.entity.LogId;
-import org.apache.ignite.raft.jraft.entity.NodeId;
-import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.entity.RaftOutter;
 import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
 import org.apache.ignite.raft.jraft.option.LogManagerOptions;
@@ -72,6 +71,9 @@ public class LogManagerTest extends BaseStorageTest {
 
     private LogStorage logStorage;
 
+    /** Disruptor for this service test. */
+    private StripedDisruptor disruptor;
+
     @BeforeEach
     public void setup() throws Exception {
         this.confManager = new ConfigurationManager();
@@ -80,7 +82,6 @@ public class LogManagerTest extends BaseStorageTest {
         this.logManager = new LogManagerImpl();
         final LogManagerOptions opts = new LogManagerOptions();
 
-        Mockito.when(node.getNodeId()).thenReturn(new NodeId("test", new PeerId("localhost", 8082)));
         NodeOptions nodeOptions = new NodeOptions();
         nodeOptions.setCommonExecutor(JRaftUtils.createExecutor("test-executor", Utils.cpus()));
         Mockito.when(node.getOptions()).thenReturn(nodeOptions);
@@ -92,6 +93,11 @@ public class LogManagerTest extends BaseStorageTest {
         opts.setNodeMetrics(new NodeMetrics(false));
         opts.setLogStorage(this.logStorage);
         opts.setRaftOptions(raftOptions);
+        opts.setGroupId("TestSrv");
+        opts.setLogManagerDisruptor(disruptor = new StripedDisruptor<>("TestLogManagerDisruptor",
+            1024,
+            () -> new LogManagerImpl.StableClosureEvent(),
+            1));
         assertTrue(this.logManager.init(opts));
     }
 
@@ -102,6 +108,7 @@ public class LogManagerTest extends BaseStorageTest {
     @AfterEach
     public void teardown() throws Exception {
         this.logStorage.shutdown();
+        disruptor.shutdown();
     }
 
     @Test