You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2023/08/07 15:51:19 UTC

[ignite-3] branch main updated: IGNITE-19960 Sync with JRaft repo - Fixes #2321.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 606f17fc3c IGNITE-19960 Sync with JRaft repo - Fixes #2321.
606f17fc3c is described below

commit 606f17fc3c0ffa71c4e2caed9bd228735516b404
Author: Mirza Aliev <al...@gmail.com>
AuthorDate: Mon Aug 7 18:50:54 2023 +0300

    IGNITE-19960 Sync with JRaft repo - Fixes #2321.
    
    Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |  28 +++---
 .../ignite/raft/jraft/core/FSMCallerImpl.java      |   6 +-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  91 +++++++++++-------
 .../raft/jraft/core/ReadOnlyServiceImpl.java       |  53 +++++++----
 .../apache/ignite/raft/jraft/core/Replicator.java  |  19 +++-
 .../apache/ignite/raft/jraft/entity/LogEntry.java  |  25 +++++
 .../ignite/raft/jraft/entity/ReadIndexStatus.java  |   7 ++
 .../OverloadException.java}                        |  41 ++++----
 .../ignite/raft/jraft/option/ApplyTaskMode.java    |  28 ++++++
 .../ignite/raft/jraft/option/NodeOptions.java      |  71 ++++++++------
 .../ignite/raft/jraft/option/RaftOptions.java      |  19 ++++
 .../apache/ignite/raft/jraft/rpc/RpcRequests.java  |   2 +-
 .../raft/jraft/rpc/impl/AbstractClientService.java |   2 +-
 .../ignite/raft/jraft/storage/LogManager.java      |   8 ++
 .../raft/jraft/storage/impl/LogManagerImpl.java    | 106 +++++++++++----------
 .../raft/jraft/storage/impl/RocksDBLogStorage.java |  34 ++++---
 .../storage/snapshot/SnapshotExecutorImpl.java     |   2 +-
 .../snapshot/local/LocalSnapshotStorage.java       |   2 +-
 .../org/apache/ignite/raft/jraft/util/Utils.java   |  11 +++
 .../raft/jraft/core/ReadOnlyServiceTest.java       |  48 ++++++++++
 .../ignite/raft/jraft/entity/LogEntryTest.java     |  36 +++++++
 .../raft/jraft/storage/impl/LogManagerTest.java    |   7 ++
 .../ignite/raft/jraft/core/ExpectClosure.java      |  29 ++++--
 .../ignite/raft/jraft/core/MockStateMachine.java   |   4 +-
 24 files changed, 472 insertions(+), 207 deletions(-)

diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 72e5dfa847..8d7cbc77a0 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -1478,7 +1478,7 @@ public class ItNodeTest {
         assertTrue(cluster.stop(leader.getNodeId().getPeerId()));
 
         assertFalse(followers.isEmpty());
-        sendTestTaskAndWait("follower apply ", followers.get(0), -1); // Should fail, because no leader.
+        int success = sendTestTaskAndWait("follower apply ", followers.get(0), 10, -1); // Should fail, because no leader.
 
         stopBlockingMessagesOnFollowers(followers);
 
@@ -1517,7 +1517,7 @@ public class ItNodeTest {
         cluster.ensureSame();
 
         for (MockStateMachine fsm : cluster.getFsms())
-            assertEquals(30, fsm.getLogs().size());
+            assertEquals(30 + success, fsm.getLogs().size());
     }
 
     @Test
@@ -3178,7 +3178,7 @@ public class ItNodeTest {
 
         TestPeer peer0 = new TestPeer(testInfo, TestUtils.INIT_PORT);
         peers.add(peer0);
-        cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0), testInfo);
+        cluster = new TestCluster("testChangePeersAddMultiNodes", dataPath, Collections.singletonList(peer0), testInfo);
         assertTrue(cluster.start(peer0));
 
         Node leader = cluster.waitAndGetLeader();
@@ -3345,7 +3345,7 @@ public class ItNodeTest {
         // start cluster
         List<TestPeer> peers = new ArrayList<>();
         peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
-        cluster = new TestCluster("unittest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
+        cluster = new TestCluster("testChangePeersChaosWithSnapshot", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
         assertTrue(cluster.start(peers.get(0), false, 2));
         // start other peers
         for (int i = 1; i < 10; i++) {
@@ -3390,7 +3390,7 @@ public class ItNodeTest {
         // start cluster
         List<TestPeer> peers = new ArrayList<>();
         peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
-        cluster = new TestCluster("unittest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
+        cluster = new TestCluster("testChangePeersChaosWithoutSnapshot", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
         assertTrue(cluster.start(peers.get(0), false, 100000));
         // start other peers
         for (int i = 1; i < 10; i++) {
@@ -3427,8 +3427,8 @@ public class ItNodeTest {
         cluster.ensureSame();
         assertEquals(10, cluster.getFsms().size());
         for (MockStateMachine fsm : cluster.getFsms()) {
-            assertTrue(fsm.getLogs().size() >= tasks);
-            assertTrue(fsm.getLogs().size() - tasks < 100);
+            final int logSize = fsm.getLogs().size();
+            assertTrue(logSize >= tasks, "logSize=" + logSize);
         }
     }
 
@@ -3437,7 +3437,7 @@ public class ItNodeTest {
         // start cluster
         List<TestPeer> peers = new ArrayList<>();
         peers.add(new TestPeer(testInfo, TestUtils.INIT_PORT));
-        cluster = new TestCluster("unittest", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
+        cluster = new TestCluster("testChangePeersChaosApplyTasks", dataPath, peers, ELECTION_TIMEOUT_MILLIS, testInfo);
         assertTrue(cluster.start(peers.get(0), false, 100000));
         // start other peers
         for (int i = 1; i < 10; i++) {
@@ -3503,7 +3503,6 @@ public class ItNodeTest {
         for (MockStateMachine fsm : cluster.getFsms()) {
             int logSize = fsm.getLogs().size();
             assertTrue(logSize >= 5000 * threads, "logSize= " + logSize);
-            assertTrue(logSize - 5000 * threads < 100, "logSize= " + logSize);
         }
     }
 
@@ -3904,20 +3903,17 @@ public class ItNodeTest {
     }
 
     @SuppressWarnings("SameParameterValue")
-    private void sendTestTaskAndWait(String prefix, Node node, int code) throws InterruptedException {
-        sendTestTaskAndWait(prefix, node, 10, code);
-    }
-
-    @SuppressWarnings("SameParameterValue")
-    private void sendTestTaskAndWait(String prefix, Node node, int amount,
+    private int sendTestTaskAndWait(String prefix, Node node, int amount,
                                      int code) throws InterruptedException {
         CountDownLatch latch = new CountDownLatch(10);
+        final AtomicInteger successCount = new AtomicInteger(0);
         for (int i = 0; i < amount; i++) {
             ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes(UTF_8));
-            Task task = new Task(data, new ExpectClosure(code, null, latch));
+            Task task = new Task(data, new ExpectClosure(code, null, latch, successCount));
             node.apply(task);
         }
         waitLatch(latch);
+        return successCount.get();
     }
 
     private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
index 9456e17565..4d4b89f5c7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/FSMCallerImpl.java
@@ -230,11 +230,7 @@ public class FSMCallerImpl implements FSMCaller {
             return false;
         }
 
-        if (!this.taskQueue.tryPublishEvent(tpl)) {
-            setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
-                "FSMCaller is overload.")));
-            return false;
-        }
+        this.taskQueue.publishEvent(tpl);
         return true;
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 15cc2c0adb..1d83ea60e7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -78,6 +79,7 @@ import org.apache.ignite.raft.jraft.entity.Task;
 import org.apache.ignite.raft.jraft.entity.UserLog;
 import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException;
 import org.apache.ignite.raft.jraft.error.LogNotFoundException;
+import org.apache.ignite.raft.jraft.error.OverloadException;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.error.RaftException;
 import org.apache.ignite.raft.jraft.option.BallotBoxOptions;
@@ -125,7 +127,6 @@ import org.apache.ignite.raft.jraft.util.RepeatedTimer;
 import org.apache.ignite.raft.jraft.util.Requires;
 import org.apache.ignite.raft.jraft.util.StringUtils;
 import org.apache.ignite.raft.jraft.util.SystemPropertyUtil;
-import org.apache.ignite.raft.jraft.util.ThreadHelper;
 import org.apache.ignite.raft.jraft.util.ThreadId;
 import org.apache.ignite.raft.jraft.util.TimeoutStrategy;
 import org.apache.ignite.raft.jraft.util.Utils;
@@ -140,9 +141,6 @@ public class NodeImpl implements Node, RaftServerService {
 
     public static final Status LEADER_STEPPED_DOWN = new Status(RaftError.EPERM, "Leader stepped down.");
 
-    // Max retry times when applying tasks.
-    private static final int MAX_APPLY_RETRY_TIMES = 3;
-
     private volatile HybridClock clock;
 
     /**
@@ -1602,7 +1600,8 @@ public class NodeImpl implements Node, RaftServerService {
                     st.setError(RaftError.EBUSY, "Is transferring leadership.");
                 }
                 LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
-                final List<Closure> dones = tasks.stream().map(ele -> ele.done).collect(Collectors.toList());
+                final List<Closure> dones = tasks.stream().map(ele -> ele.done)
+                        .filter(Objects::nonNull).collect(Collectors.toList());
                 Utils.runInThread(this.getOptions().getCommonExecutor(), () -> {
                     for (final Closure done : dones) {
                         done.run(st);
@@ -1857,36 +1856,30 @@ public class NodeImpl implements Node, RaftServerService {
 
         final LogEntry entry = new LogEntry();
         entry.setData(task.getData());
-        int retryTimes = 0;
-        try {
-            final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
-                event.reset();
-                event.nodeId = getNodeId();
-                event.done = task.getDone();
-                event.entry = entry;
-                event.expectedTerm = task.getExpectedTerm();
-            };
-            while (true) {
-                if (this.applyQueue.tryPublishEvent(translator)) {
-                    break;
-                }
-                else {
-                    retryTimes++;
-                    if (retryTimes > MAX_APPLY_RETRY_TIMES) {
-                        Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(),
-                            new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
-                        LOG.warn("Node {} applyQueue is overload.", getNodeId());
-                        this.metrics.recordTimes("apply-task-overload-times", 1);
-                        return;
+
+        final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
+            event.reset();
+            event.nodeId = getNodeId();
+            event.done = task.getDone();
+            event.entry = entry;
+            event.expectedTerm = task.getExpectedTerm();
+        };
+        switch (this.options.getApplyTaskMode()) {
+            case Blocking:
+                this.applyQueue.publishEvent(translator);
+                break;
+            case NonBlocking:
+            default:
+                if (!this.applyQueue.tryPublishEvent(translator)) {
+                    String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize();
+                    Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.EBUSY, errorMsg));
+                    LOG.warn("Node {} applyQueue is overload.", getNodeId());
+                    this.metrics.recordTimes("apply-task-overload-times", 1);
+                    if (task.getDone() == null) {
+                        throw new OverloadException(errorMsg);
                     }
-                    ThreadHelper.onSpinWait();
-                }
             }
-
-        }
-        catch (final Exception e) {
-            LOG.error("Fail to apply task.", e);
-            Utils.runClosureInThread(this.getOptions().getCommonExecutor(), task.getDone(), new Status(RaftError.EPERM, "Node is down."));
+            break;
         }
     }
 
@@ -2153,6 +2146,7 @@ public class NodeImpl implements Node, RaftServerService {
         final long startMs = Utils.monotonicMs();
         this.writeLock.lock();
         final int entriesCount = Utils.size(request.entriesList());
+        boolean success = false;
         try {
             if (!this.state.isActive()) {
                 LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
@@ -2257,6 +2251,24 @@ public class NodeImpl implements Node, RaftServerService {
                 return respBuilder.build();
             }
 
+            // fast checking if log manager is overloaded
+            if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
+                LOG.warn("Node {} received AppendEntriesRequest but log manager is busy.", getNodeId());
+
+                AppendEntriesResponseBuilder rb = raftOptions.getRaftMessagesFactory()
+                        .appendEntriesResponse()
+                        .success(false)
+                        .errorCode(RaftError.EBUSY.getNumber())
+                        .errorMsg(String.format("Node %s:%s log manager is busy.", this.groupId, this.serverId))
+                        .term(this.currTerm);
+
+                if (request.timestamp() != null) {
+                    rb.timestampLong(clock.update(request.timestamp()).longValue());
+                }
+
+                return rb.build();
+            }
+
             // Parse request
             long index = prevLogIndex;
             final List<LogEntry> entries = new ArrayList<>(entriesCount);
@@ -2296,14 +2308,23 @@ public class NodeImpl implements Node, RaftServerService {
             this.logManager.appendEntries(entries, closure);
             // update configuration after _log_manager updated its memory status
             checkAndSetConfiguration(true);
+            success = true;
             return null;
         }
         finally {
             if (doUnlock) {
                 this.writeLock.unlock();
             }
-            this.metrics.recordLatency("handle-append-entries", Utils.monotonicMs() - startMs);
-            this.metrics.recordSize("handle-append-entries-count", entriesCount);
+            final long processLatency = Utils.monotonicMs() - startMs;
+            if (entriesCount == 0) {
+                this.metrics.recordLatency("handle-heartbeat-requests", processLatency);
+            } else {
+                this.metrics.recordLatency("handle-append-entries", processLatency);
+            }
+            if (success) {
+                // Don't stats heartbeat requests.
+                this.metrics.recordSize("handle-append-entries-count", entriesCount);
+            }
         }
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
index e6aae9f426..f97eea2ae2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceImpl.java
@@ -41,6 +41,7 @@ import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.NodeId;
 import org.apache.ignite.raft.jraft.entity.ReadIndexState;
 import org.apache.ignite.raft.jraft.entity.ReadIndexStatus;
+import org.apache.ignite.raft.jraft.error.OverloadException;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.error.RaftException;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
@@ -53,15 +54,12 @@ import org.apache.ignite.raft.jraft.util.ByteString;
 import org.apache.ignite.raft.jraft.util.Bytes;
 import org.apache.ignite.raft.jraft.util.DisruptorMetricSet;
 import org.apache.ignite.raft.jraft.util.OnlyForTest;
-import org.apache.ignite.raft.jraft.util.ThreadHelper;
 import org.apache.ignite.raft.jraft.util.Utils;
 
 /**
  * Read-only service implementation.
  */
 public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndexListener {
-    private static final int MAX_ADD_REQUEST_RETRY_TIMES = 3;
-
     /**
      * Disruptor to run readonly service.
      */
@@ -194,10 +192,19 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
                     notifySuccess(readIndexStatus);
                 }
                 else {
-                    // Not applied, add it to pending-notify cache.
-                    ReadOnlyServiceImpl.this.pendingNotifyStatus
-                        .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
-                        .add(readIndexStatus);
+                    if (readIndexStatus.isOverMaxReadIndexLag(
+                            ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex(),
+                            ReadOnlyServiceImpl.this.raftOptions.getMaxReadIndexLag())
+                    ) {
+                        ReadOnlyServiceImpl.this.lock.unlock();
+                        doUnlock = false;
+                        notifyFail(new Status(-1, "Fail to run ReadIndex task, the gap of current node's apply index between leader's commit index over maxReadIndexLag"));
+                    } else  {
+                        // Not applied, add it to pending-notify cache.
+                        ReadOnlyServiceImpl.this.pendingNotifyStatus
+                            .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
+                            .add(readIndexStatus);
+                    }
                 }
             }
             finally {
@@ -329,22 +336,23 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
                 event.requestContext = new Bytes(reqCtx);
                 event.startTime = Utils.monotonicMs();
             };
-            int retryTimes = 0;
-            while (true) {
-                if (this.readIndexQueue.tryPublishEvent(translator)) {
+
+            switch (this.node.getOptions().getApplyTaskMode()) {
+                case Blocking:
+                    this.readIndexQueue.publishEvent(translator);
                     break;
-                }
-                else {
-                    retryTimes++;
-                    if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
-                        Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), closure,
-                            new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
+                case NonBlocking:
+                default:
+                    if (!this.readIndexQueue.tryPublishEvent(translator)) {
+                        String errorMsg = "Node is busy, has too many read-index requests, queue is full and bufferSize="+ this.readIndexQueue.getBufferSize();
+                        Utils.runClosureInThread(this.node.getOptions().getCommonExecutor(), closure, new Status(RaftError.EBUSY, errorMsg));
                         this.nodeMetrics.recordTimes("read-index-overload-times", 1);
                         LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
-                        return;
-                    }
-                    ThreadHelper.onSpinWait();
-                }
+                        if(closure == null) {
+                            throw new OverloadException(errorMsg);
+                        }
+                  }
+                  break;
             }
         }
         catch (final Exception e) {
@@ -419,6 +427,11 @@ public class ReadOnlyServiceImpl implements ReadOnlyService, LastAppliedLogIndex
         return this.pendingNotifyStatus;
     }
 
+    @OnlyForTest
+    RaftOptions getRaftOptions() {
+        return this.raftOptions;
+    }
+
     private void reportError(final ReadIndexStatus status, final Status st) {
         final long nowMs = Utils.monotonicMs();
         final List<ReadIndexState> states = status.getStates();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
index 9e631d8678..218f79e038 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java
@@ -56,6 +56,7 @@ import org.apache.ignite.raft.jraft.rpc.Message;
 import org.apache.ignite.raft.jraft.rpc.RaftClientService;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesResponse;
+import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotRequest;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.InstallSnapshotResponse;
 import org.apache.ignite.raft.jraft.rpc.RpcRequests.TimeoutNowRequest;
@@ -596,8 +597,8 @@ public class Replicator implements ThreadId.OnError {
             return;
         }
         boolean doUnlock = true;
-        if (!rpcService.connect(options.getPeerId())) {
-            LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", options.getPeerId());
+        if (!this.rpcService.connect(this.options.getPeerId())) {
+            LOG.error("Fail to check install snapshot connection to peer={}, give up to send install snapshot request.", this.options.getPeerId());
             block(Utils.nowMs(), RaftError.EHOSTDOWN.getNumber());
             return;
         }
@@ -1444,6 +1445,20 @@ public class Replicator implements ThreadId.OnError {
         }
         r.consecutiveErrorTimes = 0;
         if (!response.success()) {
+             // Target node is is busy, sleep for a while.
+            if (response.errorCode() == RaftError.EBUSY.getNumber()) {
+                if (isLogDebugEnabled) {
+                    sb.append(" is busy, sleep, errorMsg='") //
+                        .append(response.errorMsg()).append("'");
+                    LOG.debug(sb.toString());
+                }
+                r.resetInflights();
+                r.setState(State.Probe);
+                // unlock in in block
+                r.block(startTimeMs, status.getCode());
+                return false;
+            }
+
             if (response.term() > r.options.getTerm()) {
                 if (isLogDebugEnabled) {
                     sb.append(" fail, greater term ") //
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
index 8022e24bd1..c4826b3a64 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LogEntry.java
@@ -152,10 +152,35 @@ public class LogEntry implements Checksum {
         this.oldPeers = oldPeers;
     }
 
+    /**
+     * Returns the log data, it's not read-only, you SHOULD take care it's modification and
+     * thread-safety by yourself.
+     *
+     * @return the log data
+     */
     public ByteBuffer getData() {
         return this.data;
     }
 
+    /**
+     * Creates a new byte buffer whose content is a shared subsequence of this log entry's data
+     * buffer's content.
+     *
+     * @return The new byte buffer
+     */
+    public ByteBuffer sliceData() {
+        return this.data != null ? this.data.slice() : null;
+    }
+
+    /**
+     * Creates a new, read-only byte buffer that shares this log entry's data buffer's content.
+     *
+     * @return the new, read-only byte buffer
+     */
+    public ByteBuffer getReadOnlyData() {
+        return this.data != null ? this.data.asReadOnlyBuffer() : null;
+    }
+
     public void setData(final ByteBuffer data) {
         this.data = data;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
index ae7de63c54..23f772175c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
@@ -39,6 +39,13 @@ public class ReadIndexStatus {
         return appliedIndex >= this.index;
     }
 
+    public boolean isOverMaxReadIndexLag(long applyIndex, int maxReadIndexLag) {
+        if (maxReadIndexLag < 0) {
+            return false;
+        }
+        return this.index - applyIndex > maxReadIndexLag;
+    }
+
     public long getIndex() {
         return index;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/OverloadException.java
similarity index 50%
copy from modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
copy to modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/OverloadException.java
index ae7de63c54..29efb4eb7c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/ReadIndexStatus.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/OverloadException.java
@@ -14,41 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.raft.jraft.entity;
-
-import java.util.List;
-import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexRequest;
+package org.apache.ignite.raft.jraft.error;
 
 /**
- * ReadIndex requests statuses.
+ * Threw when Node is overloaded.
  */
-public class ReadIndexStatus {
+public class OverloadException extends JRaftException {
 
-    private final ReadIndexRequest request; // raw request
-    private final List<ReadIndexState> states; // read index requests in batch.
-    private final long index;  // committed log index.
+    /**
+     *
+     */
+    private static final long serialVersionUID = -5505054326197103575L;
 
-    public ReadIndexStatus(List<ReadIndexState> states, ReadIndexRequest request, long index) {
+    public OverloadException() {
         super();
-        this.index = index;
-        this.request = request;
-        this.states = states;
     }
 
-    public boolean isApplied(long appliedIndex) {
-        return appliedIndex >= this.index;
+    public OverloadException(final String message, final Throwable cause, final boolean enableSuppression,
+                             final boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
     }
 
-    public long getIndex() {
-        return index;
+    public OverloadException(final String message, final Throwable cause) {
+        super(message, cause);
     }
 
-    public ReadIndexRequest getRequest() {
-        return request;
+    public OverloadException(final String message) {
+        super(message);
     }
 
-    public List<ReadIndexState> getStates() {
-        return states;
+    public OverloadException(final Throwable cause) {
+        super(cause);
     }
-
-}
\ No newline at end of file
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ApplyTaskMode.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ApplyTaskMode.java
new file mode 100644
index 0000000000..9925d2bc2b
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/ApplyTaskMode.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.option;
+
+/**
+ * Apply task in blocking or non-blocking mode.
+ *
+ */
+public enum ApplyTaskMode {
+    // It is strongly not recommended to use blocking mode because it is forbidden to use blocking code in Ignite worker threads.
+    // This mode is not deleted mostly because of the desire to not differ with the original JRaft.
+    Blocking,
+    NonBlocking
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index 678dc61766..b674b9b16d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -253,6 +253,11 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     /** */
     private List<Stripe> logStripes;
 
+    /**
+     * Apply task in blocking or non-blocking mode, ApplyTaskMode.NonBlocking by default.
+     */
+    private ApplyTaskMode applyTaskMode = ApplyTaskMode.NonBlocking;
+
     public NodeOptions() {
         raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
     }
@@ -279,7 +284,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
      * @return {@code true} if shared pools mode is in use.
      */
     public boolean isSharedPools() {
-        return sharedPools;
+        return this.sharedPools;
     }
 
     /**
@@ -289,6 +294,14 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         this.sharedPools = sharedPools;
     }
 
+    public ApplyTaskMode getApplyTaskMode() {
+        return this.applyTaskMode;
+    }
+
+    public void setApplyTaskMode(final ApplyTaskMode applyTaskMode) {
+        this.applyTaskMode = applyTaskMode;
+    }
+
     /**
      * Service factory.
      */
@@ -374,7 +387,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public int getElectionPriority() {
-        return electionPriority;
+        return this.electionPriority;
     }
 
     public void setElectionPriority(int electionPriority) {
@@ -382,7 +395,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public int getDecayPriorityGap() {
-        return decayPriorityGap;
+        return this.decayPriorityGap;
     }
 
     public void setDecayPriorityGap(int decayPriorityGap) {
@@ -423,7 +436,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public int getSnapshotLogIndexMargin() {
-        return snapshotLogIndexMargin;
+        return this.snapshotLogIndexMargin;
     }
 
     public void setSnapshotLogIndexMargin(int snapshotLogIndexMargin) {
@@ -507,11 +520,11 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public ExecutorService getCommonExecutor() {
-        return commonExecutor;
+        return this.commonExecutor;
     }
 
     public FixedThreadsExecutorGroup getStripedExecutor() {
-        return stripedExecutor;
+        return this.stripedExecutor;
     }
 
     public void setStripedExecutor(FixedThreadsExecutorGroup stripedExecutor) {
@@ -519,7 +532,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public Scheduler getScheduler() {
-        return scheduler;
+        return this.scheduler;
     }
 
     public void setScheduler(Scheduler scheduler) {
@@ -527,7 +540,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public Timer getElectionTimer() {
-        return electionTimer;
+        return this.electionTimer;
     }
 
     public void setElectionTimer(Timer electionTimer) {
@@ -535,7 +548,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public Timer getVoteTimer() {
-        return voteTimer;
+        return this.voteTimer;
     }
 
     public void setVoteTimer(Timer voteTimer) {
@@ -543,7 +556,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public Timer getSnapshotTimer() {
-        return snapshotTimer;
+        return this.snapshotTimer;
     }
 
     public void setSnapshotTimer(Timer snapshotTimer) {
@@ -551,7 +564,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public Timer getStepDownTimer() {
-        return stepDownTimer;
+        return this.stepDownTimer;
     }
 
     public void setStepDownTimer(Timer stepDownTimer) {
@@ -559,7 +572,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public String getServerName() {
-        return serverName;
+        return this.serverName;
     }
 
     public void setServerName(String serverName) {
@@ -567,7 +580,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public StripedDisruptor<FSMCallerImpl.ApplyTask> getfSMCallerExecutorDisruptor() {
-        return fSMCallerExecutorDisruptor;
+        return this.fSMCallerExecutorDisruptor;
     }
 
     public void setfSMCallerExecutorDisruptor(StripedDisruptor<FSMCallerImpl.ApplyTask> fSMCallerExecutorDisruptor) {
@@ -575,7 +588,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public StripedDisruptor<NodeImpl.LogEntryAndClosure> getNodeApplyDisruptor() {
-        return nodeApplyDisruptor;
+        return this.nodeApplyDisruptor;
     }
 
     public void setNodeApplyDisruptor(StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeApplyDisruptor) {
@@ -583,7 +596,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> getReadOnlyServiceDisruptor() {
-        return readOnlyServiceDisruptor;
+        return this.readOnlyServiceDisruptor;
     }
 
     public void setReadOnlyServiceDisruptor(StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor) {
@@ -591,7 +604,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public StripedDisruptor<LogManagerImpl.StableClosureEvent> getLogManagerDisruptor() {
-        return logManagerDisruptor;
+        return this.logManagerDisruptor;
     }
 
     public void setLogManagerDisruptor(StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor) {
@@ -603,11 +616,11 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
     }
 
     public List<Stripe> getLogStripes() {
-        return logStripes;
+        return this.logStripes;
     }
 
     public HybridClock getClock() {
-        return clock;
+        return this.clock;
     }
 
     public void setClock(HybridClock clock) {
@@ -656,18 +669,18 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
         return nodeOptions;
     }
 
-    @Override
     public String toString() {
-        return "NodeOptions{" + "electionTimeoutMs=" + electionTimeoutMs + ", electionPriority=" + electionPriority
-            + ", decayPriorityGap=" + decayPriorityGap + ", leaderLeaseTimeRatio=" + leaderLeaseTimeRatio
-            + ", snapshotIntervalSecs=" + snapshotIntervalSecs + ", snapshotLogIndexMargin="
-            + snapshotLogIndexMargin + ", catchupMargin=" + catchupMargin + ", initialConf=" + initialConf
-            + ", fsm=" + fsm + ", logUri='" + logUri + '\'' + ", raftMetaUri='" + raftMetaUri + '\''
-            + ", snapshotUri='" + snapshotUri + '\'' + ", filterBeforeCopyRemote=" + filterBeforeCopyRemote
-            + ", disableCli=" + disableCli + ", timerPoolSize="
-            + timerPoolSize + ", cliRpcThreadPoolSize=" + cliRpcThreadPoolSize + ", raftRpcThreadPoolSize="
-            + raftRpcThreadPoolSize + ", enableMetrics=" + enableMetrics + ", snapshotThrottle=" + snapshotThrottle
-            + ", serviceFactory=" + serviceFactory + ", raftOptions=" + raftOptions + "} " + super.toString();
+        return "NodeOptions{" + "electionTimeoutMs=" + this.electionTimeoutMs + ", electionPriority="
+               + this.electionPriority + ", decayPriorityGap=" + this.decayPriorityGap + ", leaderLeaseTimeRatio="
+               + this.leaderLeaseTimeRatio + ", snapshotIntervalSecs=" + this.snapshotIntervalSecs
+               + ", snapshotLogIndexMargin=" + this.snapshotLogIndexMargin + ", catchupMargin=" + this.catchupMargin
+               + ", initialConf=" + this.initialConf + ", fsm=" + this.fsm + ", logUri='" + this.logUri + '\''
+               + ", raftMetaUri='" + this.raftMetaUri + '\'' + ", snapshotUri='" + this.snapshotUri + '\''
+               + ", filterBeforeCopyRemote=" + this.filterBeforeCopyRemote + ", disableCli=" + this.disableCli + ", timerPoolSize="
+               + this.timerPoolSize + ", cliRpcThreadPoolSize=" + this.cliRpcThreadPoolSize + ", raftRpcThreadPoolSize="
+               + this.raftRpcThreadPoolSize + ", enableMetrics=" + this.enableMetrics + ", snapshotThrottle="
+               + this.snapshotThrottle + ", serviceFactory=" + this.serviceFactory + ", applyTaskMode="
+               + this.applyTaskMode + ", raftOptions=" + this.raftOptions + "} " + super.toString();
     }
 
     /**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
index ee92c0de16..979acc04c9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/RaftOptions.java
@@ -122,6 +122,17 @@ public class RaftOptions implements Copiable<RaftOptions> {
      */
     private ReadOnlyOption readOnlyOptions = ReadOnlyOption.ReadOnlySafe;
 
+    /**
+     * Read index read need compare current node's apply index with leader's commit index.
+     * Only current node's apply index catch up leader's commit index, then call back success to read index closure.
+     * Therefore, there is a waiting time. The default wait timeout is 2s. It means that the waiting time
+     * over 2s, then call back failure to read index closure. If current node occur problem, it's apply index maybe
+     * behind leader's commit index. In read index timeout, it can't catch up, the timeout is waste.
+     * Here supply a config to fix it. If the gap greater than maxReadIndexLag, fail fast to call back failure
+     * read index closure.
+     */
+    private int maxReadIndexLag = -1;
+
     /**
      * Candidate steps down when election reaching timeout, default is true(enabled).
      */
@@ -159,6 +170,14 @@ public class RaftOptions implements Copiable<RaftOptions> {
         this.readOnlyOptions = readOnlyOptions;
     }
 
+    public int getMaxReadIndexLag() {
+        return maxReadIndexLag;
+    }
+
+    public void setMaxReadIndexLag(int maxReadIndexLag) {
+        this.maxReadIndexLag = maxReadIndexLag;
+    }
+
     public boolean isReplicatorPipeline() {
         return this.replicatorPipeline;
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 2bde5df2bf..d9a702eddd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -189,7 +189,7 @@ public final class RpcRequests {
     }
 
     @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_RESPONSE)
-    public interface AppendEntriesResponse extends Message {
+    public interface AppendEntriesResponse extends ErrorResponse {
         long term();
 
         boolean success();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
index 6e0ea8cef0..a2afe58770 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/AbstractClientService.java
@@ -94,7 +94,7 @@ public abstract class AbstractClientService implements ClientService, TopologyEv
         configRpcClient(this.rpcClient);
 
         // TODO asch should the client be created lazily? A client doesn't make sence without a server IGNITE-14832
-        this.rpcClient.init(null);
+        this.rpcClient.init(this.rpcOptions);
 
         this.rpcExecutor = rpcOptions.getClientExecutor();
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java
index 1d4909a55b..8e2e908ecd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/LogManager.java
@@ -105,6 +105,14 @@ public interface LogManager extends Lifecycle<LogManagerOptions>, Describer {
      */
     void join() throws InterruptedException;
 
+    /**
+     * Given specified <tt>requiredCapacity</tt> determines if that amount of space
+     * is available to append these entries. Returns true when available.
+     * @param requiredCapacity
+     * @return Returns true when available.
+     */
+    boolean hasAvailableCapacityToAppendEntries(final int requiredCapacity);
+
     /**
      * Append log entry vector and wait until it's stable (NOT COMMITTED!)
      *
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index a6363108bb..82198b7c93 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -65,8 +65,6 @@ import org.apache.ignite.raft.jraft.util.Utils;
  * LogManager implementation.
  */
 public class LogManagerImpl implements LogManager {
-    private static final int APPEND_LOG_RETRY_TIMES = 50;
-
     private static final IgniteLogger LOG = Loggers.forClass(LogManagerImpl.class);
 
     /** Raft node id. */
@@ -206,6 +204,14 @@ public class LogManagerImpl implements LogManager {
         return true;
     }
 
+    @Override
+    public boolean hasAvailableCapacityToAppendEntries(final int requiredCapacity) {
+        if (this.stopped) {
+            return false;
+        }
+        return this.diskQueue.hasAvailableCapacity(requiredCapacity);
+    }
+
     private void stopDiskThread() {
         if (this.diskQueue == null)
             return; // Was not started.
@@ -284,6 +290,8 @@ public class LogManagerImpl implements LogManager {
 
     @Override
     public void appendEntries(final List<LogEntry> entries, final StableClosure done) {
+        assert(done != null);
+
         Requires.requireNonNull(done, "done");
         if (this.hasError) {
             entries.clear();
@@ -293,7 +301,7 @@ public class LogManagerImpl implements LogManager {
         boolean doUnlock = true;
         this.writeLock.lock();
         try {
-            if (!entries.isEmpty() && !checkAndResolveConflict(entries, done)) {
+            if (!entries.isEmpty() && !checkAndResolveConflict(entries, done, this.writeLock)) {
                 // If checkAndResolveConflict returns false, the done will be called in it.
                 entries.clear();
                 return;
@@ -320,30 +328,18 @@ public class LogManagerImpl implements LogManager {
             }
             done.setEntries(entries);
 
-            int retryTimes = 0;
-            final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
-                event.reset();
-                event.nodeId = this.nodeId;
-                event.type = EventType.OTHER;
-                event.done = done;
-            };
-            while (true) {
-                if (tryOfferEvent(done, translator)) {
-                    break;
-                }
-                else {
-                    retryTimes++;
-                    if (retryTimes > APPEND_LOG_RETRY_TIMES) {
-                        reportError(RaftError.EBUSY.getNumber(), "LogManager is busy, disk queue overload.");
-                        return;
-                    }
-                    ThreadHelper.onSpinWait();
-                }
-            }
             doUnlock = false;
             if (!wakeupAllWaiter(this.writeLock)) {
                 notifyLastLogIndexListeners();
             }
+
+            // publish event out of lock
+            this.diskQueue.publishEvent((event, sequence) -> {
+              event.reset();
+              event.nodeId = this.nodeId;
+              event.type = EventType.OTHER;
+              event.done = done;
+            });
         }
         finally {
             if (doUnlock) {
@@ -352,29 +348,24 @@ public class LogManagerImpl implements LogManager {
         }
     }
 
+    /**
+     * Adds event to disk queue, NEVER call it in lock.
+     * @param done
+     * @param type
+     */
     private void offerEvent(final StableClosure done, final EventType type) {
+        assert(done != null);
+
         if (this.stopped) {
             Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
             return;
         }
-        if (!this.diskQueue.tryPublishEvent((event, sequence) -> {
+        this.diskQueue.publishEvent((event, sequence) -> {
             event.reset();
             event.nodeId = this.nodeId;
             event.type = type;
             event.done = done;
-        })) {
-            reportError(RaftError.EBUSY.getNumber(), "Log manager is overload.");
-            Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.EBUSY, "Log manager is overload."));
-        }
-    }
-
-    private boolean tryOfferEvent(final StableClosure done, final EventTranslator<StableClosureEvent> translator) {
-        if (this.stopped) {
-            Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, new Status(RaftError.ESTOP, "Log manager is stopped."));
-            return true;
-        }
-
-        return this.diskQueue.tryPublishEvent(translator);
+        });
     }
 
     private void notifyLastLogIndexListeners() {
@@ -622,6 +613,7 @@ public class LogManagerImpl implements LogManager {
     @Override
     public void setSnapshot(final SnapshotMeta meta) {
         LOG.debug("set snapshot: {}.", meta);
+        boolean doUnlock = true;
         this.writeLock.lock();
         try {
             if (meta.lastIncludedIndex() <= this.lastSnapshotId.getIndex()) {
@@ -654,7 +646,9 @@ public class LogManagerImpl implements LogManager {
             if (term == 0) {
                 // last_included_index is larger than last_index
                 // FIXME: what if last_included_index is less than first_index?
-                truncatePrefix(meta.lastIncludedIndex() + 1);
+                doUnlock = false;
+                //unlock in truncatePrefix
+                truncatePrefix(meta.lastIncludedIndex() + 1, this.writeLock);
             }
             else if (term == meta.lastIncludedTerm()) {
                 // Truncating log to the index of the last snapshot.
@@ -663,7 +657,9 @@ public class LogManagerImpl implements LogManager {
                 // followers
                 // TODO if there are still be need? TODO asch
                 if (savedLastSnapshotIndex > 0) {
-                    truncatePrefix(savedLastSnapshotIndex + 1);
+                    doUnlock = false;
+                    //unlock in truncatePrefix
+                    truncatePrefix(savedLastSnapshotIndex + 1, this.writeLock);
                 }
             }
             else {
@@ -673,7 +669,9 @@ public class LogManagerImpl implements LogManager {
             }
         }
         finally {
-            this.writeLock.unlock();
+            if (doUnlock) {
+                this.writeLock.unlock();
+            }
         }
 
     }
@@ -723,14 +721,19 @@ public class LogManagerImpl implements LogManager {
 
     @Override
     public void clearBufferedLogs() {
+        boolean doUnlock = true;
         this.writeLock.lock();
         try {
             if (this.lastSnapshotId.getIndex() != 0) {
-                truncatePrefix(this.lastSnapshotId.getIndex() + 1);
+                doUnlock = false;
+                //unlock in truncatePrefix
+                truncatePrefix(this.lastSnapshotId.getIndex() + 1, this.writeLock);
             }
         }
         finally {
-            this.writeLock.unlock();
+            if (doUnlock) {
+                this.writeLock.unlock();
+            }
         }
     }
 
@@ -869,12 +872,12 @@ public class LogManagerImpl implements LogManager {
                     return this.lastLogIndex;
                 }
                 c = new LastLogIdClosure();
-                offerEvent(c, EventType.LAST_LOG_ID);
             }
         }
         finally {
             this.readLock.unlock();
         }
+        offerEvent(c, EventType.LAST_LOG_ID);
         try {
             c.await();
         }
@@ -920,12 +923,12 @@ public class LogManagerImpl implements LogManager {
                     return this.lastSnapshotId;
                 }
                 c = new LastLogIdClosure();
-                offerEvent(c, EventType.LAST_LOG_ID);
             }
         }
         finally {
             this.readLock.unlock();
         }
+        offerEvent(c, EventType.LAST_LOG_ID);
         try {
             c.await(); // TODO FIXME asch https://issues.apache.org/jira/browse/IGNITE-15974
         }
@@ -982,7 +985,7 @@ public class LogManagerImpl implements LogManager {
         }
     }
 
-    private boolean truncatePrefix(final long firstIndexKept) {
+    private boolean truncatePrefix(final long firstIndexKept, final Lock lock) {
 
         this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().getIndex() < firstIndexKept);
 
@@ -997,6 +1000,7 @@ public class LogManagerImpl implements LogManager {
         }
         LOG.debug("Truncate prefix, firstIndexKept is :{}", firstIndexKept);
         this.configManager.truncatePrefix(firstIndexKept);
+        lock.unlock();
         final TruncatePrefixClosure c = new TruncatePrefixClosure(firstIndexKept);
         offerEvent(c, EventType.TRUNCATE_PREFIX);
         return true;
@@ -1010,16 +1014,16 @@ public class LogManagerImpl implements LogManager {
             this.lastLogIndex = nextLogIndex - 1;
             this.configManager.truncatePrefix(this.firstLogIndex);
             this.configManager.truncateSuffix(this.lastLogIndex);
-            final ResetClosure c = new ResetClosure(nextLogIndex);
-            offerEvent(c, EventType.RESET);
             return true;
         }
         finally {
             this.writeLock.unlock();
+            final ResetClosure c = new ResetClosure(nextLogIndex);
+            offerEvent(c, EventType.RESET);
         }
     }
 
-    private void unsafeTruncateSuffix(final long lastIndexKept) {
+    private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
         if (lastIndexKept < this.appliedId.getIndex()) {
             LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, lastIndexKept={}", this.appliedId,
                 lastIndexKept);
@@ -1033,12 +1037,14 @@ public class LogManagerImpl implements LogManager {
         Requires.requireTrue(this.lastLogIndex == 0 || lastTermKept != 0);
         LOG.debug("Truncate suffix :{}", lastIndexKept);
         this.configManager.truncateSuffix(lastIndexKept);
+        lock.unlock();
         final TruncateSuffixClosure c = new TruncateSuffixClosure(lastIndexKept, lastTermKept);
         offerEvent(c, EventType.TRUNCATE_SUFFIX);
+        lock.lock();
     }
 
     @SuppressWarnings("NonAtomicOperationOnVolatileField")
-    private boolean checkAndResolveConflict(final List<LogEntry> entries, final StableClosure done) {
+    private boolean checkAndResolveConflict(final List<LogEntry> entries, final StableClosure done, final Lock lock) {
         final LogEntry firstLogEntry = ArrayDeque.peekFirst(entries);
         if (firstLogEntry.getId().getIndex() == 0) {
             // Node is currently the leader and |entries| are from the user who
@@ -1088,7 +1094,7 @@ public class LogManagerImpl implements LogManager {
                     if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) {
                         // Truncate all the conflicting entries to make local logs
                         // consensus with the leader.
-                        unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1);
+                        unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock);
                     }
                     this.lastLogIndex = lastLogEntry.getId().getIndex();
                 } // else this is a duplicated AppendEntriesRequest, we have
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
index c4dfa68762..013eb67476 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/RocksDBLogStorage.java
@@ -178,7 +178,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
         this.writeLock.lock();
         try {
             if (this.db != null) {
-                LOG.warn("RocksDBLogStorage init() already.");
+                LOG.warn("RocksDBLogStorage init() in {} already.", this.path);
                 return true;
             }
             this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
@@ -291,7 +291,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
             return true;
         }
         catch (final RocksDBException e) {
-            LOG.error("Fail to save first log index {}.", e, firstLogIndex);
+            LOG.error("Fail to save first log index {} in {}.", firstLogIndex, this.path, e);
             return false;
         }
         finally {
@@ -325,7 +325,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
     private boolean executeBatch(final WriteBatchTemplate template) {
         this.readLock.lock();
         if (this.db == null) {
-            LOG.warn("DB not initialized or destroyed.");
+            LOG.warn("DB not initialized or destroyed in data path: {}.", this.path);
             this.readLock.unlock();
             return false;
         }
@@ -458,7 +458,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
             }
         }
         catch (final RocksDBException | IOException e) {
-            LOG.error("Fail to get log entry at index {}.", e, index);
+            LOG.error("Fail to get log entry at index {} in data path: {}.", index, this.path, e);
         }
         finally {
             this.readLock.unlock();
@@ -509,7 +509,7 @@ public class RocksDBLogStorage implements LogStorage, Describer {
             this.readLock.lock();
             try {
                 if (this.db == null) {
-                    LOG.warn("DB not initialized or destroyed.");
+                    LOG.warn("DB not initialized or destroyed in data path: {}.", this.path);
                     return false;
                 }
                 final WriteContext writeCtx = newWriteContext();
@@ -593,20 +593,28 @@ public class RocksDBLogStorage implements LogStorage, Describer {
     private void truncatePrefixInBackground(final long startIndex, final long firstIndexKept) {
         // delete logs in background.
         Utils.runInThread(executor, () -> {
+            long startMs = Utils.monotonicMs();
             this.readLock.lock();
             try {
-                if (this.db == null) {
+                RocksDB db = this.db;
+                if (db == null) {
+                    LOG.warn(
+                        "DB is null while truncating prefixed logs in data path: {}, the range is: [{}, {})",
+                        this.path, startIndex, firstIndexKept);
                     return;
                 }
                 onTruncatePrefix(startIndex, firstIndexKept);
-                this.db.deleteRange(this.defaultHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
-                this.db.deleteRange(this.confHandle, getKeyBytes(startIndex), getKeyBytes(firstIndexKept));
-            }
-            catch (final RocksDBException | IOException e) {
-                LOG.error("Fail to truncatePrefix {}.", e, firstIndexKept);
-            }
-            finally {
+                final byte[] startKey = getKeyBytes(startIndex);
+                final byte[] endKey = getKeyBytes(firstIndexKept);
+                // deleteRange to delete all keys in range.
+                db.deleteRange(this.defaultHandle, startKey, endKey);
+                db.deleteRange(this.confHandle, startKey, endKey);
+            } catch (final RocksDBException | IOException e) {
+                LOG.error("Fail to truncatePrefix in data path: {}, firstIndexKept={}.", this.path, firstIndexKept, e);
+            } finally {
                 this.readLock.unlock();
+                LOG.info("Truncated prefix logs in data path: {} from log index {} to {}, cost {} ms.",
+                    this.path, startIndex, firstIndexKept, Utils.monotonicMs() - startMs);
             }
         });
     }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
index 30d966add3..b62a425cd3 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/SnapshotExecutorImpl.java
@@ -230,7 +230,7 @@ public class SnapshotExecutorImpl implements SnapshotExecutor {
         }
         if (snapshotStorage instanceof LocalSnapshotStorage) {
             final LocalSnapshotStorage tmp = (LocalSnapshotStorage) this.snapshotStorage;
-            if (tmp != null && !tmp.hasServerPeerId()) {
+            if (!tmp.hasServerPeerId()) {
                 tmp.setServerPeerId(opts.getPeerId());
             }
         }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
index 97f08d6a46..9e0a5ed98d 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotStorage.java
@@ -223,7 +223,7 @@ public class LocalSnapshotStorage implements SnapshotStorage {
                 break;
             }
             LOG.info("Renaming {} to {}.", tempPath, newPath);
-            if (!new File(tempPath).renameTo(new File(newPath))) {
+            if (!Utils.atomicMoveFile(new File(tempPath), new File(newPath), true)) {
                 LOG.error("Renamed temp snapshot failed, from path {} to path {}.", tempPath, newPath);
                 ret = RaftError.EIO.getNumber();
                 ioe = new IOException("Fail to rename temp snapshot from: " + tempPath + " to: " + newPath);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
index 0a27cc4894..078e64252c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Utils.java
@@ -117,6 +117,17 @@ public final class Utils {
         reg.register(name, new ThreadPoolMetricSet(executor));
     }
 
+    /**
+     * Run closure in current thread.
+     * @param done
+     * @param status
+     */
+    public static void runClosure(Closure done, Status status) {
+        if (done != null) {
+            done.run(status);
+        }
+    }
+
     /**
      * Run closure with OK status in thread pool.
      */
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index d1d64718aa..eeac58d3f5 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -296,4 +296,52 @@ public class ReadOnlyServiceTest {
         latch.await();
         assertTrue(this.readOnlyServiceImpl.getPendingNotifyStatus().isEmpty());
     }
+
+    @Test
+    public void testOverMaxReadIndexLag() throws Exception {
+        Mockito.when(this.fsmCaller.getLastAppliedIndex()).thenReturn(1L);
+        this.readOnlyServiceImpl.getRaftOptions().setMaxReadIndexLag(50);
+
+        byte[] requestContext = TestUtils.getRandomBytes();
+        CountDownLatch latch = new CountDownLatch(1);
+        final String errMsg =
+                "Fail to run ReadIndex task, the gap of current node's apply index between leader's commit index over maxReadIndexLag";
+        this.readOnlyServiceImpl.addRequest(requestContext, new ReadIndexClosure() {
+
+            @Override
+            public void run(Status status, long index, byte[] reqCtx) {
+                assertFalse(status.isOk());
+                assertEquals(status.getErrorMsg(), errMsg);
+                assertEquals(index, -1);
+                assertArrayEquals(reqCtx, requestContext);
+                latch.countDown();
+            }
+        });
+        this.readOnlyServiceImpl.flush();
+
+        final ArgumentCaptor<RpcResponseClosure> closureCaptor = ArgumentCaptor.forClass(RpcResponseClosure.class);
+
+        Mockito.verify(this.node).handleReadIndexRequest(Mockito.argThat(new ArgumentMatcher<ReadIndexRequest>() {
+
+            @Override
+            public boolean matches(ReadIndexRequest argument) {
+                if (argument != null) {
+                    ReadIndexRequest req = (ReadIndexRequest) argument;
+                    return "test".equals(req.groupId()) && "localhost-8081".equals(req.serverId())
+                            && Utils.size(req.entriesList()) == 1
+                            && Arrays.equals(requestContext, req.entriesList().get(0).toByteArray());
+                }
+                return false;
+            }
+
+        }), closureCaptor.capture());
+
+        RpcResponseClosure closure = closureCaptor.getValue();
+
+        assertNotNull(closure);
+
+        closure.setResponse(msgFactory.readIndexResponse().index(52).success(true).build());
+        closure.run(Status.OK());
+        latch.await();
+    }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
index 4b7b5e70e3..ca6220a0be 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/entity/LogEntryTest.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.raft.jraft.entity;
 
 import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
 import java.util.Arrays;
 import org.apache.ignite.raft.jraft.entity.codec.DefaultLogEntryCodecFactory;
 import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
@@ -27,9 +28,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 public class LogEntryTest {
     @Test
@@ -122,4 +125,37 @@ public class LogEntryTest {
         assertNotEquals(c, entry.checksum());
         assertTrue(entry.isCorrupted());
     }
+
+    @Test
+    public void testSliceReadOnlyData() {
+        ByteBuffer buf = ByteBuffer.wrap("hello".getBytes());
+        LogEntry entry = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+        entry.setData(buf);
+        assertSame(buf, entry.getData());
+        final ByteBuffer slice = entry.sliceData();
+        assertNotSame(buf, slice);
+        assertEquals(5, slice.remaining());
+        assertEquals("hello", new String(slice.array()));
+        slice.position(4);
+        assertEquals(4, slice.position());
+        assertEquals(0, entry.getData().position());
+        slice.put((byte) 'a');
+        assertEquals(97, slice.get(4));
+        assertEquals("hella", new String(entry.getData().array()));
+
+        ByteBuffer readOnly = entry.getReadOnlyData();
+        assertNotSame(buf, readOnly);
+        assertEquals(5, readOnly.remaining());
+        byte[] bs = new byte[5];
+        readOnly.get(bs);
+        assertEquals("hella", new String(bs));
+
+        try {
+            readOnly.position(4);
+            readOnly.put((byte) 1);
+            fail();
+        } catch (ReadOnlyBufferException e) {
+
+        }
+    }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index bbc882da10..eabd8bbf27 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -133,6 +133,13 @@ public class LogManagerTest extends BaseStorageTest {
         assertTrue(this.logManager.checkConsistency().isOk());
     }
 
+    @Test
+    public void testHasAvailableCapacityToAppendEntries() {
+        assertTrue(this.logManager.hasAvailableCapacityToAppendEntries(1));
+        assertTrue(this.logManager.hasAvailableCapacityToAppendEntries(10));
+        assertFalse(this.logManager.hasAvailableCapacityToAppendEntries(1000000));
+    }
+
     @Test
     public void testAppendOneEntry() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java
index e984751d19..1fdb77ff62 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/ExpectClosure.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.raft.jraft.core;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.error.RaftError;
@@ -24,42 +25,52 @@ import org.apache.ignite.raft.jraft.error.RaftError;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ExpectClosure implements Closure {
-    private int expectedErrCode;
-    private String expectErrMsg;
-    private CountDownLatch latch;
+    private final int expectedErrCode;
+    private final String expectErrMsg;
+    private final CountDownLatch latch;
+    private AtomicInteger successCount;
 
-    public ExpectClosure(CountDownLatch latch) {
+    public ExpectClosure(final CountDownLatch latch) {
         this(RaftError.SUCCESS, latch);
     }
 
-    public ExpectClosure(RaftError expectedErrCode, CountDownLatch latch) {
+    public ExpectClosure(final RaftError expectedErrCode, final CountDownLatch latch) {
         this(expectedErrCode, null, latch);
 
     }
 
-    public ExpectClosure(RaftError expectedErrCode, String expectErrMsg, CountDownLatch latch) {
+    public ExpectClosure(final RaftError expectedErrCode, final String expectErrMsg, final CountDownLatch latch) {
         super();
         this.expectedErrCode = expectedErrCode.getNumber();
         this.expectErrMsg = expectErrMsg;
         this.latch = latch;
     }
 
-    public ExpectClosure(int code, String expectErrMsg, CountDownLatch latch) {
+    public ExpectClosure(final int code, final String expectErrMsg, final CountDownLatch latch) {
+        this(code, expectErrMsg, latch, null);
+    }
+
+    public ExpectClosure(final int code, final String expectErrMsg, final CountDownLatch latch,
+            final AtomicInteger successCount) {
         super();
         this.expectedErrCode = code;
         this.expectErrMsg = expectErrMsg;
         this.latch = latch;
+        this.successCount = successCount;
     }
 
     @Override
-    public void run(Status status) {
+    public void run(final Status status) {
         if (this.expectedErrCode >= 0) {
             assertEquals(this.expectedErrCode, status.getCode());
         }
         if (this.expectErrMsg != null) {
             assertEquals(this.expectErrMsg, status.getErrorMsg());
         }
-        latch.countDown();
+        if (status.isOk() && this.successCount != null) {
+            this.successCount.incrementAndGet();
+        }
+        this.latch.countDown();
     }
 
 }
diff --git a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
index 84c11a97f8..c63b0ad96c 100644
--- a/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
+++ b/modules/raft/src/testFixtures/java/org/apache/ignite/raft/jraft/core/MockStateMachine.java
@@ -35,6 +35,7 @@ import org.apache.ignite.raft.jraft.Iterator;
 import org.apache.ignite.raft.jraft.Status;
 import org.apache.ignite.raft.jraft.entity.LeaderChangeContext;
 import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.error.RaftError;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader;
 import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
@@ -165,7 +166,8 @@ public class MockStateMachine extends StateMachineAdapter {
 
     @Override
     public boolean onSnapshotLoad(final SnapshotReader reader) {
-        this.lastAppliedIndex.set(0);
+        SnapshotMeta meta = reader.load();
+        this.lastAppliedIndex.set(meta.lastIncludedIndex());
         this.loadSnapshotTimes++;
         final String path = reader.getPath() + File.separator + "data";
         final File file = new File(path);