You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/09/25 14:31:13 UTC

[ignite-3] branch main updated: IGNITE-19806 Port new log storage (PR #696) from "sofa-jraft" (#2245)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f2fc9528fb IGNITE-19806 Port new log storage (PR #696) from "sofa-jraft" (#2245)
f2fc9528fb is described below

commit f2fc9528fb3e00a5ee67845c8927b8b9172a3748
Author: Ivan Bessonov <be...@gmail.com>
AuthorDate: Mon Sep 25 17:31:07 2023 +0300

    IGNITE-19806 Port new log storage (PR #696) from "sofa-jraft" (#2245)
---
 build.gradle                                       |  13 +
 gradle/libs.versions.toml                          |   4 +
 .../apache/ignite/internal/util/IgniteUtils.java   |  24 +
 .../ignite/internal/util/IgniteUtilsTest.java      |  15 +
 modules/raft/build.gradle                          |   1 +
 .../apache/ignite/raft/jraft/core/ItNodeTest.java  |  11 +-
 .../raft/jraft/entity/LocalFileMetaOutter.java     |  31 +-
 .../ignite/raft/jraft/storage/logit/LibC.java      |  59 +++
 .../storage/logit/LogitLogStorageFactory.java      |  70 +++
 .../jraft/storage/logit/option/StoreOptions.java   | 120 +++++
 .../raft/jraft/storage/logit/package-info.java     |  22 +
 .../storage/logit/storage/LogitLogStorage.java     | 554 ++++++++++++++++++++
 .../jraft/storage/logit/storage/db/AbstractDB.java | 415 +++++++++++++++
 .../jraft/storage/logit/storage/db/ConfDB.java     |  40 ++
 .../jraft/storage/logit/storage/db/IndexDB.java    | 147 ++++++
 .../storage/logit/storage/db/SegmentLogDB.java     |  40 ++
 .../logit/storage/factory/LogStoreFactory.java     | 100 ++++
 .../storage/logit/storage/file/AbstractFile.java   | 523 +++++++++++++++++++
 .../storage/logit/storage/file/FileHeader.java     | 112 ++++
 .../storage/logit/storage/file/FileManager.java    | 577 +++++++++++++++++++++
 .../jraft/storage/logit/storage/file/FileType.java |  43 ++
 .../logit/storage/file/assit/AbortFile.java        |  73 +++
 .../logit/storage/file/assit/Checkpoint.java       |  80 +++
 .../file/assit/FirstLogIndexCheckpoint.java        |  68 +++
 .../storage/file/assit/FlushStatusCheckpoint.java  |  81 +++
 .../logit/storage/file/index/IndexFile.java        | 280 ++++++++++
 .../logit/storage/file/index/IndexType.java        |  34 ++
 .../logit/storage/file/segment/SegmentFile.java    | 209 ++++++++
 .../logit/storage/service/AllocateFileService.java | 157 ++++++
 .../logit/storage/service/ServiceManager.java      |  79 +++
 .../ignite/raft/jraft/storage/logit/util/Pair.java |  47 ++
 .../logit/util/concurrent/ReferenceResource.java   |  84 +++
 .../logit/util/concurrent/ShutdownAbleThread.java  |  82 +++
 .../snapshot/local/LocalSnapshotWriter.java        |   5 +-
 .../apache/ignite/raft/jraft/util/Platform.java    |  19 +
 .../jraft/storage/impl/BaseLogStorageTest.java     |  20 +-
 .../raft/jraft/storage/io/MessageFileTest.java     |  12 +-
 .../jraft/storage/logit/BaseLogitStorageTest.java  |  77 +++
 .../jraft/storage/logit/LogitLogStorageTest.java   | 126 +++++
 .../raft/jraft/storage/logit/db/ConfDBTest.java    | 117 +++++
 .../raft/jraft/storage/logit/db/IndexDBTest.java   | 154 ++++++
 .../jraft/storage/logit/db/SegmentLogDBTest.java   | 121 +++++
 .../jraft/storage/logit/file/FileManagerTest.java  | 136 +++++
 .../storage/logit/file/index/IndexFileTest.java    | 135 +++++
 .../logit/file/segment/SegmentFileTest.java        | 125 +++++
 .../snapshot/local/LocalSnapshotMetaTableTest.java |  10 +-
 .../snapshot/local/LocalSnapshotWriterTest.java    |   6 +-
 .../snapshot/local/SnapshotFileReaderTest.java     |   2 +-
 48 files changed, 5210 insertions(+), 50 deletions(-)

diff --git a/build.gradle b/build.gradle
index bd9d33f11f..42f58c4e0a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,6 +19,7 @@
 //This need for resolving plugins in buildscripts
 plugins {
     id('idea')
+    alias(libs.plugins.ideaext)
     alias(libs.plugins.javacc) apply false
     alias(libs.plugins.modernizer) apply false
     alias(libs.plugins.aggregateJavadoc)
@@ -45,6 +46,10 @@ ext {
     ]
 }
 
+def compilerArgs = [
+        "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"
+]
+
 allprojects {
     group 'org.apache.ignite'
     version = "3.0.0-SNAPSHOT"
@@ -89,6 +94,7 @@ allprojects {
 
     tasks.withType(JavaCompile) {
         options.encoding = 'UTF-8'
+        options.compilerArgs += compilerArgs
     }
 
     //Temporary hack to disable caching of Test tasks.
@@ -127,3 +133,10 @@ subprojects {
 
     tasks.register('printSubDependencies', DependencyReportTask)
 }
+
+idea.project.settings {
+    compiler.javac {
+        // Workaround on https://youtrack.jetbrains.com/issue/IDEA-154038.
+        javacAdditionalOptions = compilerArgs.join(' ')
+    }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 9b8e21c259..320789e57e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -78,6 +78,7 @@ autoService = "1.0.1"
 awaitility = "4.2.0"
 progressBar = "0.9.4"
 guava = "31.1-jre"
+jna = "5.13.0"
 
 #Tools
 pmdTool = "6.55.0"
@@ -95,6 +96,7 @@ docker = "com.palantir.docker:0.34.0"
 checksum = "org.gradle.crypto.checksum:1.4.0"
 setupbuilder = "de.inetsoftware.setupbuilder:7.2.13"
 aggregateJavadoc = "io.freefair.aggregate-javadoc:6.5.1"
+ideaext = "org.jetbrains.gradle.plugin.idea-ext:1.1.7"
 
 [libraries]
 assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj" }
@@ -251,3 +253,5 @@ auto-service-annotations = { module = "com.google.auto.service:auto-service-anno
 awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
 
 progressBar = { module = "me.tongfei:progressbar", version.ref = "progressBar" }
+
+jna = { module = "net.java.dev.jna:jna", version.ref = "jna"}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1ea9fed961..5800ac4b2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
 
 import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.Arrays.copyOfRange;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
@@ -1068,6 +1069,29 @@ public class IgniteUtils {
         return result;
     }
 
+    /**
+     * Converts byte buffer into a byte array. The content of the array is all bytes from the position "0" to "capacity". Preserves original
+     * position/limit values in the buffer. Always returns a new instance, instead of accessing the internal buffer's array.
+     */
+    public static byte[] byteBufferToByteArray(ByteBuffer buffer) {
+        if (buffer.hasArray()) {
+            int offset = buffer.arrayOffset();
+
+            return copyOfRange(buffer.array(), offset, offset + buffer.capacity());
+        } else {
+            byte[] array = new byte[buffer.capacity()];
+
+            int originalPosition = buffer.position();
+
+            buffer.position(0);
+            buffer.get(array);
+
+            buffer.position(originalPosition);
+
+            return array;
+        }
+    }
+
     /**
      * Stops all ignite components.
      *
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 7768be4889..b7199bc81d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -23,6 +23,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
 import static org.apache.ignite.internal.util.IgniteUtils.copyStateTo;
 import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
@@ -30,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -40,6 +42,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -200,4 +203,16 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
 
         assertThat(result, willThrow(NumberFormatException.class));
     }
+
+    @Test
+    void testByteBufferToByteArray() {
+        ByteBuffer heapBuffer = ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4}, 1, 3).slice();
+        assertArrayEquals(new byte[] {1, 2, 3}, byteBufferToByteArray(heapBuffer));
+
+        ByteBuffer bigDirectBuffer = ByteBuffer.allocateDirect(5);
+        bigDirectBuffer.put(new byte[]{0, 1, 2, 3, 4});
+
+        ByteBuffer smallDirectBuffer = bigDirectBuffer.position(1).limit(4).slice();
+        assertArrayEquals(new byte[] {1, 2, 3}, byteBufferToByteArray(smallDirectBuffer));
+    }
 }
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index f37d6dd86d..e871919f8b 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -36,6 +36,7 @@ dependencies {
     implementation libs.dropwizard.metrics
     implementation libs.jctools.core
     implementation libs.auto.service.annotations
+    implementation libs.jna
 
     annotationProcessor project(":ignite-network-annotation-processor")
     annotationProcessor libs.auto.service
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 61eb954321..66176c3354 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.core;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Collections.synchronizedList;
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
 import static org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
 import static org.apache.ignite.raft.jraft.test.TestUtils.sender;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -2732,13 +2733,13 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         UserLog userLog = leader.readCommittedUserLog(1);
         assertNotNull(userLog);
         assertEquals(2, userLog.getIndex());
-        assertEquals("hello0", stringFromBytes(userLog.getData().array()));
+        assertEquals("hello0", stringFromBytes(byteBufferToByteArray(userLog.getData())));
 
         // index == 5 is a DATA log(a user log)
         userLog = leader.readCommittedUserLog(5);
         assertNotNull(userLog);
         assertEquals(5, userLog.getIndex());
-        assertEquals("hello3", stringFromBytes(userLog.getData().array()));
+        assertEquals("hello3", stringFromBytes(byteBufferToByteArray(userLog.getData())));
 
         // index == 15 is greater than last_committed_index
         try {
@@ -2795,20 +2796,20 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
         userLog = leader.readCommittedUserLog(12);
         assertNotNull(userLog);
         assertEquals(16, userLog.getIndex());
-        assertEquals("hello10", stringFromBytes(userLog.getData().array()));
+        assertEquals("hello10", stringFromBytes(byteBufferToByteArray(userLog.getData())));
 
         // now index == 17 is a user log
         userLog = leader.readCommittedUserLog(17);
         assertNotNull(userLog);
         assertEquals(17, userLog.getIndex());
-        assertEquals("hello11", stringFromBytes(userLog.getData().array()));
+        assertEquals("hello11", stringFromBytes(byteBufferToByteArray(userLog.getData())));
 
         cluster.ensureSame();
         assertEquals(3, cluster.getFsms().size());
         for (MockStateMachine fsm : cluster.getFsms()) {
             assertEquals(20, fsm.getLogs().size());
             for (int i = 0; i < 20; i++)
-                assertEquals("hello" + i, stringFromBytes(fsm.getLogs().get(i).array()));
+                assertEquals("hello" + i, stringFromBytes(byteBufferToByteArray(fsm.getLogs().get(i))));
         }
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
index 6a3b09600d..e32a6e54ae 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
@@ -31,39 +31,29 @@ public final class LocalFileMetaOutter {
      */
     public enum FileSource {
         /**
-         * <code>FILE_SOURCE_LOCAL = 0;</code>
+         * <code>FILE_SOURCE_LOCAL = 1;</code>
          */
-        FILE_SOURCE_LOCAL(0),
+        FILE_SOURCE_LOCAL(1),
         /**
-         * <code>FILE_SOURCE_REFERENCE = 1;</code>
+         * <code>FILE_SOURCE_REFERENCE = 2;</code>
          */
-        FILE_SOURCE_REFERENCE(1);
+        FILE_SOURCE_REFERENCE(2);
 
         public final int getNumber() {
             return value;
         }
 
-        /**
-         * @deprecated Use {@link #forNumber(int)} instead.
-         */
-        @Deprecated
-        public static FileSource valueOf(int value) {
-            return forNumber(value);
-        }
-
         public static FileSource forNumber(int value) {
             switch (value) {
-                case 0:
-                    return FILE_SOURCE_LOCAL;
                 case 1:
+                    return FILE_SOURCE_LOCAL;
+                case 2:
                     return FILE_SOURCE_REFERENCE;
                 default:
                     return null;
             }
         }
 
-        private static final FileSource[] VALUES = values();
-
         private final int value;
 
         private FileSource(int value) {
@@ -73,13 +63,16 @@ public final class LocalFileMetaOutter {
 
     @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_FILE_META)
     public interface LocalFileMeta extends Message {
+        int sourceNumber();
+
         @Nullable
-        @Marshallable
-        FileSource source();
+        default FileSource source() {
+            return FileSource.forNumber(sourceNumber());
+        }
 
         @Nullable
         String checksum();
 
-        boolean hasUserMeta();
+        byte @Nullable[] userMeta();
     }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LibC.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LibC.java
new file mode 100644
index 0000000000..cb61f04d4c
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LibC.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Platform;
+import com.sun.jna.Pointer;
+
+/**
+ * Moved from rocketmq.
+ *
+ *  https://raw.githubusercontent.com/apache/rocketmq/master/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
+ */
+public interface LibC extends Library {
+    LibC INSTANCE      = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
+
+    int  MADV_WILLNEED = 3;
+    int  MADV_DONTNEED = 4;
+
+    int  MCL_CURRENT   = 1;
+    int  MCL_FUTURE    = 2;
+    int  MCL_ONFAULT   = 4;
+
+    /* sync memory asynchronously */
+    int  MS_ASYNC      = 0x0001;
+    /* invalidate mappings & caches */
+    int  MS_INVALIDATE = 0x0002;
+    /* synchronous memory sync */
+    int  MS_SYNC       = 0x0004;
+
+    int mlock(Pointer var1, NativeLong var2);
+
+    int munlock(Pointer var1, NativeLong var2);
+
+    int madvise(Pointer var1, NativeLong var2, int var3);
+
+    Pointer memset(Pointer p, int v, long len);
+
+    int mlockall(int flags);
+
+    int msync(Pointer p, NativeLong length, int flags);
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java
new file mode 100644
index 0000000000..bc66237992
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import java.nio.file.Paths;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.LogitLogStorage;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.StringUtils;
+
+public class LogitLogStorageFactory implements LogStorageFactory {
+    private static final IgniteLogger LOG = Loggers.forClass(LogitLogStorageFactory.class);
+
+    public static final String NEW_STORAGE_PATH = "LogitStorage";
+
+    /** Executor for shared storages. */
+    protected final ExecutorService executorService;
+
+    private final StoreOptions storeOptions;
+
+    public LogitLogStorageFactory(StoreOptions storeOptions) {
+        this.storeOptions = storeOptions;
+        executorService = Executors.newFixedThreadPool(
+                Runtime.getRuntime().availableProcessors() * 2,
+                new NamedThreadFactory("raft-shared-log-storage-pool", LOG)
+        );
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public LogStorage createLogStorage(String uri, RaftOptions raftOptions) {
+        Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage uri.");
+
+        String newStoragePath = Paths.get(uri, NEW_STORAGE_PATH).toString();
+        return new LogitLogStorage(newStoragePath, storeOptions, raftOptions, executorService);
+    }
+
+    @Override
+    public void close() {
+        ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
new file mode 100644
index 0000000000..855373a320
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.option;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+
+/**
+ * Storage options
+ */
+public class StoreOptions {
+
+    private static final String storagePath                   = "localLog";
+
+    // Default is 0.5G
+    private int                 segmentFileSize               = 1024 * 1024 * 512;
+
+    private int                 indexFileSize                 = FileHeader.HEADER_SIZE + 5000000
+                                                                * IndexEntry.INDEX_SIZE;
+
+    private int                 confFileSize                  = 1024 * 1024 * 512;
+
+    // Whether enable warm up file when pre allocate
+    private boolean             enableWarmUpFile              = true;
+
+    // Pre allocate files
+    private int                 preAllocateFileCount          = 2;
+
+    // How many files can be kept in memory, default = preAllocateFileCount + 3
+    private int                 keepInMemoryFileCount         = 5;
+
+    // The max times for flush
+    private int                 maxFlushTimes                 = 200;
+
+    // Checkpoint
+    private int                 checkpointFlushStatusInterval = 5000;
+
+    public String getStoragePath() {
+        return storagePath;
+    }
+
+    public boolean isEnableWarmUpFile() {
+        return enableWarmUpFile;
+    }
+
+    public void setEnableWarmUpFile(final boolean enableWarmUpFile) {
+        this.enableWarmUpFile = enableWarmUpFile;
+    }
+
+    public int getSegmentFileSize() {
+        return segmentFileSize;
+    }
+
+    public void setSegmentFileSize(final int segmentFileSize) {
+        this.segmentFileSize = segmentFileSize;
+    }
+
+    public int getIndexFileSize() {
+        return indexFileSize;
+    }
+
+    public void setIndexFileSize(final int indexFileSize) {
+        this.indexFileSize = indexFileSize;
+    }
+
+    public int getConfFileSize() {
+        return confFileSize;
+    }
+
+    public void setConfFileSize(final int confFileSize) {
+        this.confFileSize = confFileSize;
+    }
+
+    public int getPreAllocateFileCount() {
+        return preAllocateFileCount;
+    }
+
+    public void setPreAllocateFileCount(final int preAllocateFileCount) {
+        this.preAllocateFileCount = preAllocateFileCount;
+    }
+
+    public int getKeepInMemoryFileCount() {
+        return keepInMemoryFileCount;
+    }
+
+    public void setKeepInMemoryFileCount(final int keepInMemoryFileCount) {
+        this.keepInMemoryFileCount = keepInMemoryFileCount;
+    }
+
+    public int getMaxFlushTimes() {
+        return maxFlushTimes;
+    }
+
+    public void setMaxFlushTimes(final int maxFlushTimes) {
+        this.maxFlushTimes = maxFlushTimes;
+    }
+
+    public int getCheckpointFlushStatusInterval() {
+        return checkpointFlushStatusInterval;
+    }
+
+    public void setCheckpointFlushStatusInterval(final int checkpointFlushStatusInterval) {
+        this.checkpointFlushStatusInterval = checkpointFlushStatusInterval;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/package-info.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/package-info.java
new file mode 100644
index 0000000000..4903e2fdd7
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains classes from {@code jraft-extension/java-log-storage-impl/src/main/java/com/alipay/sofa/jraft/}.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
new file mode 100644
index 0000000000..69df2d73a6
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB.LogEntryIterator;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.ConfDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.IndexDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.SegmentLogDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.FirstLogIndexCheckpoint;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.apache.ignite.raft.jraft.util.OnlyForTest;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+
+/**
+ * A logStorage implemented by java
+ */
+public class LogitLogStorage implements LogStorage {
+    private static final IgniteLogger LOG = Loggers.forClass(LogitLogStorage.class);
+
+    private static final String           INDEX_STORE_PATH       = "LogIndex";
+    private static final String           SEGMENT_STORE_PATH     = "LogSegment";
+    private static final String           CONF_STORE_PATH        = "LogConf";
+    private static final String           FIRST_INDEX_CHECKPOINT = "FirstLogIndexCheckpoint";
+    private final FirstLogIndexCheckpoint firstLogIndexCheckpoint;
+    private final ReadWriteLock           readWriteLock          = new ReentrantReadWriteLock();
+    private final Lock                    readLock               = this.readWriteLock.readLock();
+    private final Lock                    writeLock              = this.readWriteLock.writeLock();
+    private final StoreOptions            storeOptions;
+    private final RaftOptions             raftOptions;
+    private final String                  indexStorePath;
+    private final String                  segmentStorePath;
+    private final String                  confStorePath;
+    private ConfigurationManager          configurationManager;
+    private LogEntryEncoder               logEntryEncoder;
+    private LogEntryDecoder               logEntryDecoder;
+    private SegmentLogDB                  segmentLogDB;
+    private IndexDB                       indexDB;
+    private ConfDB                        confDB;
+    private LogStoreFactory               logStoreFactory;
+
+    /** Executor that handles prefix truncation. */
+    private final Executor executor;
+
+    public LogitLogStorage(final String path, final StoreOptions storeOptions, RaftOptions raftOptions, Executor executor) {
+        this.indexStorePath = Paths.get(path, INDEX_STORE_PATH).toString();
+        this.segmentStorePath = Paths.get(path, SEGMENT_STORE_PATH).toString();
+        this.confStorePath = Paths.get(path, CONF_STORE_PATH).toString();
+        this.storeOptions = storeOptions;
+        this.raftOptions = raftOptions;
+        this.executor = executor;
+        final String checkPointPath = Paths.get(path, FIRST_INDEX_CHECKPOINT).toString();
+        this.firstLogIndexCheckpoint = new FirstLogIndexCheckpoint(checkPointPath, raftOptions);
+    }
+
+    @Override
+    public boolean init(final LogStorageOptions opts) {
+        Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager");
+        Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory");
+        this.writeLock.lock();
+        try {
+            this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+            this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+            this.configurationManager = opts.getConfigurationManager();
+
+            // Create dbs and recover
+            this.logStoreFactory = new LogStoreFactory(this.storeOptions, raftOptions);
+            this.indexDB = new IndexDB(this.indexStorePath);
+            this.segmentLogDB = new SegmentLogDB(this.segmentStorePath);
+            this.confDB = new ConfDB(this.confStorePath);
+            if (!(this.indexDB.init(this.logStoreFactory) && this.segmentLogDB.init(this.logStoreFactory) && this.confDB
+                .init(this.logStoreFactory))) {
+                LOG.warn("Init dbs failed when startup logitLogStorage");
+                return false;
+            }
+
+            this.firstLogIndexCheckpoint.load();
+            return recoverAndLoad();
+        } catch (final IOException e) {
+            LOG.error("Error on load firstLogIndexCheckPoint", e);
+        } finally {
+            this.writeLock.unlock();
+        }
+        return false;
+    }
+
+    public boolean recoverAndLoad() {
+        this.writeLock.lock();
+        try {
+            this.indexDB.recover();
+            this.segmentLogDB.recover();
+            this.confDB.recover();
+
+            // Check consistency
+            if (!checkConsistencyAndAlignLog()) {
+                LOG.warn("Check the consistency and align log failed");
+                return false;
+            }
+
+            // Load configuration to conf manager
+            loadConfiguration();
+
+            // Set first log index
+            if (!this.firstLogIndexCheckpoint.isInit()) {
+                saveFirstLogIndex(this.indexDB.getFirstLogIndex());
+            }
+            LOG.info("Recover dbs and start timingServer success, last recover index:{}",
+                this.indexDB.getLastLogIndex());
+            return true;
+        } catch (final Exception e) {
+            LOG.error("Error on recover db", e);
+            return false;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Check db's consistency and align the log;
+     * @return true if align success;
+     */
+    private boolean checkConsistencyAndAlignLog() {
+        final long lastIndex = this.indexDB.getLastLogIndex();
+        final long lastSegmentIndex = this.segmentLogDB.getLastLogIndex();
+        final long lastConfIndex = this.confDB.getLastLogIndex();
+        if (lastIndex == lastSegmentIndex || lastIndex == lastConfIndex) {
+            return true;
+        }
+        final long maxLogIndex = Math.max(lastSegmentIndex, lastConfIndex);
+        if (lastIndex > maxLogIndex) {
+            // In this case, just align indexDB to the index of max(lastSegmentIndex, lastConfIndex)
+            return this.indexDB.truncateSuffix(maxLogIndex, 0);
+        } else {
+            // In this case, we should generate indexEntry array sorted by index from segmentDB and confDB, then
+            // store indexEntry array to indexDB
+
+            // Step1, lookup last (segment/conf) index in indexDB
+            final Pair<IndexEntry, IndexEntry> lastIndexPair = this.indexDB.lookupLastLogIndexAndPosFromTail();
+            IndexEntry lastSegmentIndexInfo = lastIndexPair.getFirst();
+            IndexEntry lastConfIndexInfo = lastIndexPair.getSecond();
+
+            /**
+             * There exists a bad case, for example
+             * The index db has index entries 1 ~ 12, but all of the index entries are log index, don't contain conf index
+             * The segmentLog db has logs 1 ~ 12 and 16 ~ 19, the conf db has logs 13 ~ 15.
+             * So in this case, the lastConfIndexInfo will be null, but we should set it to the first log position
+             */
+            if (lastSegmentIndexInfo == null) {
+                lastSegmentIndexInfo = new IndexEntry(this.segmentLogDB.getFirstLogIndex(), FileHeader.HEADER_SIZE,
+                    IndexType.IndexSegment.getType());
+            }
+            if (lastConfIndexInfo == null) {
+                lastConfIndexInfo = new IndexEntry(this.confDB.getFirstLogIndex(), FileHeader.HEADER_SIZE,
+                    IndexType.IndexConf.getType());
+            }
+
+            // Step2, Using two-way merging algorithm to construct ordered index entry array
+            final LogEntryIterator segmentLogIterator = this.segmentLogDB.iterator(this.logEntryDecoder,
+                lastSegmentIndexInfo.getLogIndex(), lastSegmentIndexInfo.getPosition());
+            final LogEntryIterator confLogIterator = this.confDB.iterator(this.logEntryDecoder,
+                lastConfIndexInfo.getLogIndex(), lastConfIndexInfo.getPosition());
+            final List<IndexEntry> indexArray = generateOrderedIndexArrayByMergingLogIterator(segmentLogIterator,
+                confLogIterator);
+
+            // Step3, store array to indexDB
+            long maxFlushPosition = this.indexDB.appendBatchIndexAsync(indexArray);
+
+            // Step4, wait for flushing indexDB
+            return this.indexDB.waitForFlush(maxFlushPosition, this.storeOptions.getMaxFlushTimes());
+        }
+    }
+
+    /**
+     * Generate ordered index entry array by using tow-way merging algorithm
+     * @param segmentLogIterator segment log iterator
+     * @param confLogIterator conf log iterator
+     * @return ordered index entry array
+     */
+    public List<IndexEntry> generateOrderedIndexArrayByMergingLogIterator(final LogEntryIterator segmentLogIterator,
+                                                                          final LogEntryIterator confLogIterator) {
+        LogEntry segmentEntry = null, confEntry = null;
+        int segmentPosition = -1, confPosition = -1;
+        final List<IndexEntry> indexEntries = new ArrayList<>();
+        while (true) {
+            // Pull next entry
+            if (segmentEntry == null && segmentLogIterator != null && segmentLogIterator.hasNext()) {
+                segmentEntry = segmentLogIterator.next();
+                segmentPosition = segmentLogIterator.getReadPosition();
+            }
+            if (confEntry == null && confLogIterator != null && confLogIterator.hasNext()) {
+                confEntry = confLogIterator.next();
+                confPosition = confLogIterator.getReadPosition();
+            }
+            if (segmentEntry == null && confEntry == null) {
+                break;
+            }
+            // Merge
+            if (segmentEntry != null && confEntry != null) {
+                if (segmentEntry.getId().getIndex() < confEntry.getId().getIndex()) {
+                    indexEntries.add(new IndexEntry(segmentEntry.getId().getIndex(), segmentPosition,
+                        IndexType.IndexSegment.getType()));
+                    segmentEntry = null;
+                } else {
+                    indexEntries.add(new IndexEntry(confEntry.getId().getIndex(), confPosition, IndexType.IndexConf
+                        .getType()));
+                    confEntry = null;
+                }
+            } else {
+                indexEntries.add(segmentEntry != null ? new IndexEntry(segmentEntry.getId().getIndex(),
+                    segmentPosition, IndexType.IndexSegment.getType()) : new IndexEntry(confEntry.getId().getIndex(),
+                    confPosition, IndexType.IndexConf.getType()));
+                segmentEntry = confEntry = null;
+            }
+        }
+        return indexEntries;
+    }
+
+    private boolean saveFirstLogIndex(final long logIndex) {
+        try {
+            this.firstLogIndexCheckpoint.setFirstLogIndex(logIndex);
+            return this.firstLogIndexCheckpoint.save();
+        } catch (final IOException e) {
+            LOG.error("Error when save first log index", e);
+            return false;
+        }
+    }
+
+    /**
+     * Load configuration logEntries in confDB to configurationManager
+     */
+    public void loadConfiguration() {
+        final LogEntryIterator confIterator = this.confDB.iterator(this.logEntryDecoder);
+        LogEntry entry;
+        while ((entry = confIterator.next()) != null) {
+            if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
+                final ConfigurationEntry confEntry = new ConfigurationEntry();
+                confEntry.setId(new LogId(entry.getId().getIndex(), entry.getId().getTerm()));
+                confEntry.setConf(new Configuration(entry.getPeers(), entry.getLearners()));
+                if (entry.getOldPeers() != null) {
+                    confEntry.setOldConf(new Configuration(entry.getOldPeers(), entry.getOldLearners()));
+                }
+                if (this.configurationManager != null) {
+                    this.configurationManager.add(confEntry);
+                }
+            }
+        }
+    }
+
+    /****************************  Implementation   ********************************/
+
+    @Override
+    public long getFirstLogIndex() {
+        this.readLock.lock();
+        try {
+            if (this.firstLogIndexCheckpoint.firstLogIndex >= 0) {
+                return this.firstLogIndexCheckpoint.firstLogIndex;
+            } else if (this.indexDB.getFirstLogIndex() >= 0) {
+                return this.indexDB.getFirstLogIndex();
+            } else {
+                return 1L;
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public long getLastLogIndex() {
+        this.readLock.lock();
+        try {
+            if (this.indexDB.getLastLogIndex() >= 0) {
+                // Just use indexDB to get lastLogIndex
+                return this.indexDB.getLastLogIndex();
+            } else {
+                return 0;
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public LogEntry getEntry(final long index) {
+        this.readLock.lock();
+        try {
+            if (index < getFirstLogIndex() || index > getLastLogIndex()) {
+                return null;
+            }
+            final IndexEntry indexEntry = this.indexDB.lookupIndex(index);
+            final int phyPosition = indexEntry.getPosition();
+            final byte logType = indexEntry.getLogType();
+            if (phyPosition != -1) {
+                byte[] logBytes;
+                if (logType == IndexType.IndexSegment.getType()) {
+                    logBytes = this.segmentLogDB.lookupLog(index, phyPosition);
+                } else {
+                    logBytes = this.confDB.lookupLog(index, phyPosition);
+                }
+                if (logBytes != null) {
+                    return this.logEntryDecoder.decode(logBytes);
+                }
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+        return null;
+    }
+
+    @Override
+    public long getTerm(final long index) {
+        final LogEntry entry = getEntry(index);
+        if (entry != null) {
+            return entry.getId().getTerm();
+        }
+        return 0;
+    }
+
+    @Override
+    public boolean appendEntry(final LogEntry entry) {
+        this.readLock.lock();
+        try {
+            final long logIndex = entry.getId().getIndex();
+            final byte[] logData = this.logEntryEncoder.encode(entry);
+            if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
+                return doAppendEntry(logIndex, logData, this.confDB, IndexType.IndexConf, true);
+            } else {
+                return doAppendEntry(logIndex, logData, this.segmentLogDB, IndexType.IndexSegment, true);
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public int appendEntries(final List<LogEntry> entries) {
+        this.readLock.lock();
+        try {
+            int appendCount = 0;
+            final int size = entries.size();
+
+            // Find last log and last conf log
+            int lastLogIndex = -1;
+            int lastConfIndex = -1;
+            for (int i = entries.size() - 1; i >= 0; i--) {
+                final LogEntry entry = entries.get(i);
+                final boolean isConfEntry = (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION);
+                if (isConfEntry && lastConfIndex == -1) {
+                    lastConfIndex = i;
+                } else if (!isConfEntry && lastLogIndex == -1) {
+                    lastLogIndex = i;
+                }
+                if (lastConfIndex >= 0 && lastLogIndex >= 0) {
+                    break;
+                }
+            }
+
+            for (int i = 0; i < size; i++) {
+                final boolean isWaitingFlush = (i == lastLogIndex || i == lastConfIndex);
+                final LogEntry entry = entries.get(i);
+                final long logIndex = entry.getId().getIndex();
+                final byte[] logData = this.logEntryEncoder.encode(entry);
+                if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
+                    if (doAppendEntry(logIndex, logData, this.confDB, IndexType.IndexConf, isWaitingFlush)) {
+                        appendCount++;
+                    }
+                } else {
+                    if (doAppendEntry(logIndex, logData, this.segmentLogDB, IndexType.IndexSegment, isWaitingFlush)) {
+                        appendCount++;
+                    }
+                }
+            }
+
+            return appendCount;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    private boolean doAppendEntry(final long logIndex, final byte[] data, final AbstractDB logDB,
+                                  final IndexType indexType, final boolean isWaitingFlush) {
+        this.readLock.lock();
+        try {
+            if (logDB == null || this.indexDB == null) {
+                return false;
+            }
+
+            // Append log async , get position infos
+            final Pair<Integer, Long> logPair = logDB.appendLogAsync(logIndex, data);
+            if (logPair.getFirst() < 0 || logPair.getSecond() < 0) {
+                return false;
+            }
+
+            final Pair<Integer, Long> indexPair = this.indexDB
+                .appendIndexAsync(logIndex, logPair.getFirst(), indexType);
+            if (indexPair.getFirst() < 0 || indexPair.getSecond() < 0) {
+                return false;
+            }
+
+            // Save first log index
+            if (!this.firstLogIndexCheckpoint.isInit()) {
+                saveFirstLogIndex(logIndex);
+            }
+
+            if (isWaitingFlush) {
+                return waitForFlush(logDB, logPair.getSecond(), indexPair.getSecond());
+            }
+            return true;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    private boolean waitForFlush(final AbstractDB logDB, final long exceptedLogPosition,
+                                 final long exceptedIndexPosition) {
+        final int maxFlushTimes = this.storeOptions.getMaxFlushTimes();
+        // We should flush log db first, because even If the power fails after flushing the log db
+        // we can restore the index db based on the log db.
+        if (!logDB.waitForFlush(exceptedLogPosition, maxFlushTimes)) {
+            return false;
+        }
+        return this.indexDB.waitForFlush(exceptedIndexPosition, maxFlushTimes);
+    }
+
+    @Override
+    public boolean truncatePrefix(final long firstIndexKept) {
+        this.readLock.lock();
+        try {
+            final boolean ret = saveFirstLogIndex(firstIndexKept);
+            if (ret) {
+                Utils.runInThread(executor, () -> {
+                    this.indexDB.truncatePrefix(firstIndexKept);
+                    this.segmentLogDB.truncatePrefix(firstIndexKept);
+                    this.confDB.truncatePrefix(firstIndexKept);
+                });
+            }
+            return ret;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public boolean truncateSuffix(final long lastIndexKept) {
+        final Pair<Integer, Integer> posPair = this.indexDB.lookupFirstLogPosFromLogIndex(lastIndexKept + 1);
+        final int SegmentTruncatePos = posPair.getFirst();
+        final int ConfLogTruncatePos = posPair.getSecond();
+        final int lastIndexKeptPos = this.indexDB.lookupIndex(lastIndexKept).getPosition();
+
+        if (lastIndexKeptPos != -1) {
+            // Truncate indexDB
+            this.indexDB.truncateSuffix(lastIndexKept, 0);
+            // Truncate segmentDB
+            this.segmentLogDB.truncateSuffix(lastIndexKept, SegmentTruncatePos);
+            // Truncate confDB
+            this.confDB.truncateSuffix(lastIndexKept, ConfLogTruncatePos);
+
+            return this.indexDB.getLastLogIndex() == lastIndexKept;
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean reset(final long nextLogIndex) {
+        this.writeLock.lock();
+        try {
+            LogEntry entry = getEntry(nextLogIndex);
+            this.indexDB.reset(nextLogIndex);
+            this.segmentLogDB.reset(nextLogIndex);
+            this.confDB.reset(nextLogIndex);
+            if (entry == null) {
+                entry = new LogEntry();
+                entry.setType(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+                entry.setId(new LogId(nextLogIndex, 0));
+            }
+            saveFirstLogIndex(-1);
+            return appendEntry(entry);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        this.writeLock.lock();
+        try {
+            this.indexDB.shutdown();
+            this.segmentLogDB.shutdown();
+            this.confDB.shutdown();
+        } catch (final Exception e) {
+            LOG.error("Error on shutdown dbs", e);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    @OnlyForTest
+    public IndexDB getIndexDB() {
+        return indexDB;
+    }
+
+    @OnlyForTest
+    public ConfDB getConfDB() {
+        return confDB;
+    }
+
+    @OnlyForTest
+    public SegmentLogDB getSegmentLogDB() {
+        return segmentLogDB;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
new file mode 100644
index 0000000000..7477abb61d
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import java.io.IOException;
+import java.nio.file.Path;import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.raft.jraft.Lifecycle;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile.RecoverResult;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileManager;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.AbortFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.FlushStatusCheckpoint;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.segment.SegmentFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.ServiceManager;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+
+/**
+ * DB parent class that invokes fileManager and anager
+ * and wrappers uniform functions such as recover() etc..
+ */
+public abstract class AbstractDB implements Lifecycle<LogStoreFactory> {
+    private static final IgniteLogger LOG = Loggers.forClass(AbstractDB.class);
+
+    private static final String      FLUSH_STATUS_CHECKPOINT = "FlushStatusCheckpoint";
+    private static final String      ABORT_FILE              = "Abort";
+
+    protected final String           storePath;
+    protected FileManager            fileManager;
+    protected ServiceManager         serviceManager;
+    protected LogStoreFactory        logStoreFactory;
+    protected StoreOptions           storeOptions;
+    protected AbortFile              abortFile;
+    protected FlushStatusCheckpoint  flushStatusCheckpoint;
+    private ScheduledExecutorService checkpointExecutor;
+
+    protected AbstractDB(final String storePath) {
+        this.storePath = storePath;
+    }
+
+    @Override
+    public boolean init(final LogStoreFactory logStoreFactory) {
+        this.logStoreFactory = logStoreFactory;
+        this.storeOptions = logStoreFactory.getStoreOptions();
+        final String flushStatusCheckpointPath = Paths.get(this.storePath, FLUSH_STATUS_CHECKPOINT).toString();
+        final String abortFilePath = Paths.get(this.storePath, ABORT_FILE).toString();
+        this.flushStatusCheckpoint = new FlushStatusCheckpoint(flushStatusCheckpointPath, logStoreFactory.getRaftOptions());
+        this.abortFile = new AbortFile(abortFilePath);
+        this.serviceManager = logStoreFactory.newServiceManager(this);
+        if (!this.serviceManager.init(logStoreFactory)) {
+            return false;
+        }
+        this.fileManager = logStoreFactory.newFileManager(getDBFileType(), this.storePath,
+            this.serviceManager.getAllocateService());
+        this.checkpointExecutor = Executors
+                .newSingleThreadScheduledExecutor(new NamedThreadFactory(getDBName() + "-Checkpoint-Thread-", LOG));
+        final int interval = this.storeOptions.getCheckpointFlushStatusInterval();
+        this.checkpointExecutor.scheduleAtFixedRate(this::doCheckpoint, interval, interval, TimeUnit.MILLISECONDS);
+        return true;
+    }
+
+    @Override
+    public void shutdown() {
+        doCheckpoint();
+        if (this.serviceManager != null) {
+            this.serviceManager.shutdown();
+        }
+        if (this.fileManager != null) {
+            this.fileManager.shutdown();
+        }
+        if (this.abortFile != null) {
+            this.abortFile.destroy();
+        }
+        this.checkpointExecutor.shutdown();
+    }
+
+    /**
+     * @return this db's name
+     */
+    public String getDBName() {
+        return getClass().getSimpleName();
+    }
+
+    /**
+     * @return this db's file type (index or segmentLog or conf)
+     */
+    public abstract FileType getDBFileType();
+
+    /**
+     * @return this db's file size
+     */
+    public abstract int getDBFileSize();
+
+    /**
+     * Log Entry iterator
+     */
+    public static class LogEntryIterator implements Iterator<LogEntry> {
+        private final AbstractFile[]  files;
+        private int                   currentReadPos;
+        private int                   preReadPos;
+        private int                   currentFileId;
+        private final LogEntryDecoder logEntryDecoder;
+
+        /**
+         *
+         * @param files target files
+         * @param logEntryDecoder decoder
+         * @param currentReadPos the beginning read position in the first file
+         */
+        public LogEntryIterator(final AbstractFile[] files, final LogEntryDecoder logEntryDecoder,
+                                final int currentReadPos) {
+            this.files = files;
+            this.logEntryDecoder = logEntryDecoder;
+            if (files.length > 0) {
+                this.currentFileId = 0;
+                this.currentReadPos = Math.max(currentReadPos, FileHeader.HEADER_SIZE);
+            } else {
+                this.currentFileId = -1;
+                this.currentReadPos = -1;
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return this.currentFileId >= 0 && this.currentFileId < this.files.length;
+        }
+
+        @Override
+        public LogEntry next() {
+            if (this.currentFileId == -1)
+                { return null; }
+            byte[] data;
+            while (true) {
+                if (currentFileId >= this.files.length)
+                    { return null; }
+                final SegmentFile segmentFile = (SegmentFile) this.files[currentFileId];
+                if (segmentFile == null) {
+                    return null;
+                }
+
+                data = segmentFile.lookupData(this.currentReadPos);
+                if (data == null) {
+                    // Reach file end
+                    this.currentFileId += 1;
+                    this.currentReadPos = FileHeader.HEADER_SIZE;
+                } else {
+                    this.preReadPos = this.currentReadPos;
+                    this.currentReadPos += SegmentFile.getWriteBytes(data);
+                    return this.logEntryDecoder.decode(data);
+                }
+            }
+        }
+
+        public int getReadPosition() {
+            return this.preReadPos;
+        }
+    }
+
+    public LogEntryIterator iterator(final LogEntryDecoder logEntryDecoder, long beginIndex, int beginPosition) {
+        final AbstractFile[] files = this.fileManager.findFileFromLogIndex(beginIndex);
+        return new LogEntryIterator(files, logEntryDecoder, beginPosition);
+    }
+
+    public LogEntryIterator iterator(final LogEntryDecoder logEntryDecoder) {
+        final AbstractFile[] files = this.fileManager.copyFiles();
+        return new LogEntryIterator(files, logEntryDecoder, 0);
+    }
+
+    /**
+     * Recover when startUp
+     */
+    public synchronized void recover() {
+        final List<AbstractFile> files = this.fileManager.loadExistedFiles();
+        try {
+            if (files.isEmpty()) {
+                this.fileManager.setFlushedPosition(0);
+                this.abortFile.create();
+                return;
+            }
+            this.flushStatusCheckpoint.load();
+            final boolean normalExit = !this.abortFile.exists();
+            long recoverOffset;
+            int startRecoverIndex;
+            if (!normalExit) {
+                // Abnormal exit, should recover from lastCheckpointFile
+                startRecoverIndex = findLastCheckpointFile(files, this.flushStatusCheckpoint);
+                LOG.info("{} {} did not exit normally, will try to recover files from fileIndex:{}.", getDBName(),
+                    this.storePath, startRecoverIndex);
+            } else {
+                // Normal exit , just recover last file
+                startRecoverIndex = files.size() - 1;
+            }
+            recoverOffset = (long) startRecoverIndex * (long) getDBFileSize();
+            recoverOffset = recoverFiles(startRecoverIndex, files, recoverOffset);
+            this.fileManager.setFlushedPosition(recoverOffset);
+
+            if (normalExit) {
+                this.abortFile.create();
+            } else {
+                this.abortFile.touch();
+            }
+        } catch (final Exception e) {
+            LOG.error("Error on recover {} files , store path: {} , {}", getDBName(), this.storePath, e);
+            throw new RuntimeException(e);
+        } finally {
+            startServiceManager();
+        }
+    }
+
+    /**
+     * Recover files
+     * @return last recover offset
+     */
+    protected long recoverFiles(final int startRecoverIndex, final List<AbstractFile> files, long processOffset) {
+        AbstractFile preFile = null;
+        boolean needTruncate = false;
+        for (int index = 0; index < files.size(); index++) {
+            final AbstractFile file = files.get(index);
+            final boolean isLastFile = index == files.size() - 1;
+
+            if (index < startRecoverIndex) {
+                // Update files' s position when don't need to recover
+                file.updateAllPosition(getDBFileSize());
+            } else {
+                final RecoverResult result = file.recover();
+                if (result.recoverSuccess()) {
+                    if (result.recoverTotal()) {
+                        processOffset += isLastFile ? result.getLastOffset() : getDBFileSize();
+                    } else {
+                        processOffset += result.getLastOffset();
+                        needTruncate = true;
+                    }
+                } else {
+                    needTruncate = true;
+                }
+            }
+
+            if (preFile != null) {
+                preFile.setLastLogIndex(file.getFirstLogIndex() - 1);
+            }
+            preFile = file;
+
+            if (needTruncate) {
+                // Error on recover files , truncate to processOffset
+                LOG.warn("Try to truncate files to processOffset:{} when recover files", processOffset);
+                this.fileManager.truncateSuffixByOffset(processOffset);
+                break;
+            }
+        }
+        return processOffset;
+    }
+
+    private int findLastCheckpointFile(final List<AbstractFile> files, final FlushStatusCheckpoint checkpoint) {
+        if (checkpoint == null || checkpoint.fileName == null) {
+            return 0;
+        }
+        for (int fileIndex = 0; fileIndex < files.size(); fileIndex++) {
+            final AbstractFile file = files.get(fileIndex);
+            if (getFileName(file).equalsIgnoreCase(checkpoint.fileName)) {
+                return fileIndex;
+            }
+        }
+        return 0;
+    }
+
+    private static String getFileName(AbstractFile file) {
+        return Path.of(file.getFilePath()).getFileName().toString();
+    }
+
+    private void doCheckpoint() {
+        long flushedPosition = getFlushedPosition();
+        if (flushedPosition % getDBFileSize() == 0) {
+            flushedPosition -= 1;
+        }
+        final AbstractFile file = this.fileManager.findFileByOffset(flushedPosition, false);
+        try {
+            if (file != null) {
+                this.flushStatusCheckpoint.setFileName(getFileName(file));
+                this.flushStatusCheckpoint.setFlushPosition(flushedPosition);
+                this.flushStatusCheckpoint.setLastLogIndex(getLastLogIndex());
+                this.flushStatusCheckpoint.save();
+            }
+        } catch (final IOException e) {
+            LOG.error("Error when do checkpoint in db:{}", getDBName());
+        }
+    }
+
+    /**
+     * Write the data and return it's wrote position.
+     * @param data logEntry data
+     * @return (wrotePosition, expectFlushPosition)
+     */
+    public Pair<Integer, Long> appendLogAsync(final long logIndex, final byte[] data) {
+        final int waitToWroteSize = SegmentFile.getWriteBytes(data);
+        final SegmentFile segmentFile = (SegmentFile) this.fileManager.getLastFile(logIndex, waitToWroteSize, true);
+        if (segmentFile != null) {
+            final int pos = segmentFile.appendData(logIndex, data);
+            final long expectFlushPosition = segmentFile.getFileFromOffset() + pos + waitToWroteSize;
+            return Pair.of(pos, expectFlushPosition);
+        }
+        return Pair.of(-1, (long) -1);
+    }
+
+    /**
+     * Read log from the segmentFile.
+     *
+     * @param logIndex the log index
+     * @param pos      the position to read
+     * @return read data
+     */
+    public byte[] lookupLog(final long logIndex, final int pos) {
+        final SegmentFile segmentFile = (SegmentFile) this.fileManager.findFileByLogIndex(logIndex, false);
+        if (segmentFile != null) {
+            final long targetFlushPosition = segmentFile.getFileFromOffset() + pos;
+            if (targetFlushPosition <= getFlushedPosition()) {
+                return segmentFile.lookupData(logIndex, pos);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Flush db files and wait for flushPosition >= maxExpectedFlushPosition
+     * @return true if flushPosition >= maxExpectedFlushPosition
+     */
+    public boolean waitForFlush(final long maxExpectedFlushPosition, final int maxFlushTimes) {
+        int cnt = 0;
+        while (getFlushedPosition() < maxExpectedFlushPosition) {
+            flush();
+            cnt++;
+            if (cnt > maxFlushTimes) {
+                LOG.error("Try flush db {} times, but the flushPosition {} can't exceed expectedFlushPosition {}",
+                    maxFlushTimes, getFlushedPosition(), maxExpectedFlushPosition);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public void startServiceManager() {
+        this.serviceManager.start();
+    }
+
+    public boolean flush() {
+        return this.fileManager.flush();
+    }
+
+    public boolean truncatePrefix(final long firstIndexKept) {
+        return this.fileManager.truncatePrefix(firstIndexKept);
+
+    }
+
+    public boolean truncateSuffix(final long lastIndexKept, final int pos) {
+        if (this.fileManager.truncateSuffix(lastIndexKept, pos)) {
+            doCheckpoint();
+        }
+        return false;
+    }
+
+    public boolean reset(final long nextLogIndex) {
+        this.flushStatusCheckpoint.destroy();
+        this.fileManager.reset(nextLogIndex);
+        doCheckpoint();
+        return true;
+    }
+
+    public long getFlushedPosition() {
+        return this.fileManager.getFlushedPosition();
+    }
+
+    public StoreOptions getStoreOptions() {
+        return this.storeOptions;
+    }
+
+    public String getStorePath() {
+        return this.storePath;
+    }
+
+    public long getFirstLogIndex() {
+        return this.fileManager.getFirstLogIndex();
+    }
+
+    public long getLastLogIndex() {
+        return this.fileManager.getLastLogIndex();
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
new file mode 100644
index 0000000000..a79ee0ace3
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+
+/**
+ * DB that stores configuration type log entry
+ */
+public class ConfDB extends AbstractDB {
+
+    public ConfDB(final String storePath) {
+        super(storePath);
+    }
+
+    @Override
+    public FileType getDBFileType() {
+        return FileType.FILE_CONFIGURATION;
+    }
+
+    @Override
+    public int getDBFileSize() {
+        return this.storeOptions.getConfFileSize();
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
new file mode 100644
index 0000000000..012e740bb1
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import java.util.List;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+
+/**
+ * DB that stores index entry
+ */
+public class IndexDB extends AbstractDB {
+
+    public IndexDB(final String storePath) {
+        super(storePath);
+    }
+
+    /**
+     * Append IndexEntry(logIndex , position, indexType)
+     * @return (wrotePosition, expectFlushPosition)
+     */
+    public Pair<Integer, Long> appendIndexAsync(final long logIndex, final int position, final IndexType type) {
+        final long lastIndex = getLastLogIndex();
+        if (lastIndex != -1 && logIndex != getLastLogIndex() + 1) {
+            return Pair.of(-1, (long) -1);
+        }
+        final int waitToWroteSize = IndexEntry.INDEX_SIZE;
+        final IndexFile indexFile = (IndexFile) this.fileManager.getLastFile(logIndex, waitToWroteSize, true);
+        if (indexFile != null) {
+            final int pos = indexFile.appendIndex(logIndex, position, type.getType());
+            final long expectFlushPosition = indexFile.getFileFromOffset() + pos + waitToWroteSize;
+            return Pair.of(pos, expectFlushPosition);
+        }
+        return Pair.of(-1, (long) -1);
+    }
+
+    /**
+     * Append IndexEntryArray(logIndex , position, indexType)
+     * @return max expectFlushPosition
+     */
+    public Long appendBatchIndexAsync(final List<IndexEntry> indexArray) {
+        long maxFlushPosition = -1;
+        for (int i = 0; i < indexArray.size(); i++) {
+            final IndexEntry index = indexArray.get(i);
+            final long logIndex = index.getLogIndex();
+            if (logIndex == getLastLogIndex() + 1) {
+                final Pair<Integer, Long> flushPair = appendIndexAsync(logIndex, index.getPosition(),
+                    index.getLogType() == 1 ? IndexType.IndexSegment : IndexType.IndexConf);
+                maxFlushPosition = Math.max(maxFlushPosition, flushPair.getSecond());
+            }
+        }
+        return maxFlushPosition;
+    }
+
+    /**
+     * Lookup IndexEntry by logIndex
+     */
+    public IndexEntry lookupIndex(final long logIndex) {
+        final IndexFile indexFile = (IndexFile) this.fileManager.findFileByLogIndex(logIndex, false);
+        if (indexFile != null) {
+            final int indexPos = indexFile.calculateIndexPos(logIndex);
+            final long targetFlushPosition = indexFile.getFileFromOffset() + indexPos;
+            if (targetFlushPosition <= getFlushedPosition()) {
+                return indexFile.lookupIndex(logIndex);
+            }
+        }
+        return IndexEntry.newInstance();
+    }
+
+    /**
+     * Search the first segment log index and first conf log index from logIndex
+     * @return pair of first log pos(segment / conf)
+     */
+    public Pair<Integer, Integer> lookupFirstLogPosFromLogIndex(final long logIndex) {
+        final long lastLogIndex = getLastLogIndex();
+        int firstSegmentPos = -1;
+        int firstConfPos = -1;
+        long index = logIndex;
+        while (index <= lastLogIndex) {
+            final IndexEntry indexEntry = lookupIndex(index);
+            if (indexEntry.getLogType() == IndexType.IndexSegment.getType() && firstSegmentPos == -1) {
+                firstSegmentPos = indexEntry.getPosition();
+            } else if (indexEntry.getLogType() == IndexType.IndexConf.getType() && firstConfPos == -1) {
+                firstConfPos = indexEntry.getPosition();
+            }
+            if (firstSegmentPos > 0 && firstConfPos > 0) {
+                break;
+            }
+            index++;
+        }
+        return Pair.of(firstSegmentPos, firstConfPos);
+    }
+
+    /**
+     * Search the last segment log index and last conf log index from tail
+     * @return pair of IndexEntry<last log index and position> (segment / conf)
+     */
+    public Pair<IndexEntry, IndexEntry> lookupLastLogIndexAndPosFromTail() {
+        final long lastLogIndex = getLastLogIndex();
+        final long firstLogIndex = getFirstLogIndex();
+        IndexEntry lastSegmentIndex = null, lastConfIndex = null;
+        long index = lastLogIndex;
+        while (index >= firstLogIndex) {
+            final IndexEntry indexEntry = lookupIndex(index);
+            indexEntry.setLogIndex(index);
+            if (indexEntry.getLogType() == IndexType.IndexSegment.getType() && lastSegmentIndex == null) {
+                lastSegmentIndex = indexEntry;
+            } else if (indexEntry.getLogType() == IndexType.IndexConf.getType() && lastConfIndex == null) {
+                lastConfIndex = indexEntry;
+            }
+            if (lastSegmentIndex != null && lastConfIndex != null) {
+                break;
+            }
+            index--;
+        }
+        return Pair.of(lastSegmentIndex, lastConfIndex);
+    }
+
+    @Override
+    public FileType getDBFileType() {
+        return FileType.FILE_INDEX;
+    }
+
+    @Override
+    public int getDBFileSize() {
+        return this.storeOptions.getIndexFileSize();
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
new file mode 100644
index 0000000000..370194bd47
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+
+/**
+ * DB that stores logEntry
+ */
+public class SegmentLogDB extends AbstractDB {
+
+    public SegmentLogDB(final String storePath) {
+        super(storePath);
+    }
+
+    @Override
+    public FileType getDBFileType() {
+        return FileType.FILE_SEGMENT;
+    }
+
+    @Override
+    public int getDBFileSize() {
+        return this.storeOptions.getSegmentFileSize();
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
new file mode 100644
index 0000000000..8b262ddfc3
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.factory;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileManager;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.segment.SegmentFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.AllocateFileService;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.ServiceManager;
+
+/**
+ * The factory that provides uniform construction functions
+ */
+public class LogStoreFactory {
+    private final StoreOptions storeOptions;
+    private final RaftOptions raftOptions;
+
+    public LogStoreFactory(final StoreOptions opts, RaftOptions raftOptions) {
+        this.storeOptions = opts;
+    this.raftOptions = raftOptions;}
+
+    /**
+     * Create new file(index/segment/conf)
+     */
+    public AbstractFile newFile(final FileType fileType, final String filePath) {
+        return isIndex(fileType) ? new IndexFile(filePath, this.storeOptions.getIndexFileSize()) : //
+            isConf(fileType) ? new SegmentFile(filePath, this.storeOptions.getConfFileSize()) : //
+                new SegmentFile(filePath, this.storeOptions.getSegmentFileSize());
+    }
+
+    /**
+     * Create new fileManager(index/segment/conf)
+     */
+    public FileManager newFileManager(final FileType fileType, final String storePath,
+                                      final AllocateFileService allocateService) {
+        final FileManager.FileManagerBuilder fileManagerBuilder = FileManager.newBuilder() //
+            .fileType(fileType) //
+            .fileSize(getFileSize(fileType)) //
+            .storePath(storePath) //
+            .logStoreFactory(this) //
+            .allocateService(allocateService);
+        return fileManagerBuilder.build();
+    }
+
+    /**
+     * Create new serviceManager
+     */
+    public ServiceManager newServiceManager(final AbstractDB abstractDB) {
+        return new ServiceManager(abstractDB);
+    }
+
+    /**
+     * Create new allocateFileService
+     */
+    public AllocateFileService newAllocateService(final AbstractDB abstractDB) {
+        return new AllocateFileService(abstractDB, this);
+    }
+
+    public int getFileSize(final FileType fileType) {
+        return isIndex(fileType) ? this.storeOptions.getIndexFileSize() : //
+            isConf(fileType) ? this.storeOptions.getConfFileSize() : //
+                this.storeOptions.getSegmentFileSize();
+    }
+
+    private boolean isIndex(final FileType fileType) {
+        return fileType == FileType.FILE_INDEX;
+    }
+
+    private boolean isConf(final FileType fileType) {
+        return fileType == FileType.FILE_CONFIGURATION;
+    }
+
+    public StoreOptions getStoreOptions() {
+        return this.storeOptions;
+    }
+
+    public RaftOptions getRaftOptions() {
+        return raftOptions;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
new file mode 100644
index 0000000000..ac0e4d36a6
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.raft.jraft.storage.logit.LibC;
+import org.apache.ignite.raft.jraft.storage.logit.util.concurrent.ReferenceResource;
+import org.apache.ignite.raft.jraft.util.Platform;
+import org.apache.ignite.raft.jraft.util.Utils;
+
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * File parent class that wrappers uniform functions such as mmap(), flush() etc..
+ */
+public abstract class AbstractFile extends ReferenceResource {
+    private static final IgniteLogger LOG = Loggers.forClass(AbstractFile.class);
+
+    protected static final int    BLANK_HOLE_SIZE = 64;
+
+    protected static final byte   FILE_END_BYTE   = 'x';
+
+    protected String              filePath;
+
+    // The size of this file
+    protected int                 fileSize;
+
+    protected File                file;
+
+    protected FileHeader          header;
+
+    protected MappedByteBuffer    mappedByteBuffer;
+
+    // Current write position
+    protected final AtomicInteger wrotePosition   = new AtomicInteger(0);
+
+    // Current flush position
+    protected final AtomicInteger flushedPosition = new AtomicInteger(0);
+
+    protected final ReadWriteLock readWriteLock   = new ReentrantReadWriteLock();
+    protected final Lock          readLock        = this.readWriteLock.readLock();
+    protected final Lock          writeLock       = this.readWriteLock.writeLock();
+    private volatile boolean      isMapped        = false;
+    private final ReentrantLock   mapLock         = new ReentrantLock();
+
+    public AbstractFile(final String filePath, final int fileSize, final boolean isMapped) {
+        initAndMap(filePath, fileSize, isMapped);
+        this.header = new FileHeader();
+    }
+
+    public void initAndMap(final String filePath, final int fileSize, final boolean isMapped) {
+        this.filePath = filePath;
+        this.fileSize = fileSize;
+        this.file = new File(filePath);
+        this.mapLock.lock();
+        try {
+            if (!this.file.exists()) {
+                if (!this.file.createNewFile()) {
+                    throw new RuntimeException("Failed to create new file");
+                }
+            }
+            if (isMapped) {
+                this.map(MapMode.READ_WRITE);
+            }
+        } catch (final Throwable t) {
+            LOG.error("Error happen when create file:{}", filePath, t);
+        } finally {
+            this.mapLock.unlock();
+        }
+    }
+
+    public void mapInIfNecessary() {
+        if (!isMapped()) {
+            this.map(MapMode.READ_ONLY);
+        }
+    }
+
+    public void map(final MapMode mapMode) {
+        this.mapLock.lock();
+        try {
+            if (!isMapped()) {
+                try (final RandomAccessFile randomAccessFile = new RandomAccessFile(this.file, "rw");
+                        final FileChannel fileChannel = randomAccessFile.getChannel()) {
+                    this.mappedByteBuffer = fileChannel.map(mapMode, 0, this.fileSize);
+                    this.isMapped = true;
+                }
+            }
+        } catch (final Throwable t) {
+            LOG.error("map file {} failed , {}", getFilePath(), t);
+            throw new RuntimeException(t);
+        } finally {
+            this.mapLock.unlock();
+        }
+    }
+
+    public void unmmap() {
+        if (isMapped()) {
+            this.mapLock.lock();
+            try {
+                if (isMapped()) {
+                    this.mappedByteBuffer.force();
+                    this.flushedPosition.set(getWrotePosition());
+                    if (this.mappedByteBuffer != null) {
+                        if (Platform.isLinux()) {
+                            hintUnload();
+                        }
+                        Utils.unmap(this.mappedByteBuffer);
+                    }
+                    this.isMapped = false;
+                }
+            } catch (final Throwable t) {
+                LOG.error("error unmap file {} , {}", getFilePath(), t);
+                throw new RuntimeException(t);
+            } finally {
+                this.mapLock.unlock();
+            }
+        }
+    }
+
+    public boolean isMapped() {
+        return this.isMapped;
+    }
+
+    public static class RecoverResult {
+        // Is recover success
+        private final boolean recoverSuccess;
+        // Is recover total data or encounter an error when recover
+        private final boolean recoverTotal;
+        // Last recover offset
+        private final int     lastOffset;
+
+        public RecoverResult(final boolean recoverSuccess, final boolean recoverTotal, final int lastOffset) {
+            this.recoverSuccess = recoverSuccess;
+            this.recoverTotal = recoverTotal;
+            this.lastOffset = lastOffset;
+        }
+
+        public static RecoverResult newInstance(final boolean isSuccess, final boolean isRecoverTotal,
+                                                final int recoverOffset) {
+            return new RecoverResult(isSuccess, isRecoverTotal, recoverOffset);
+        }
+
+        public int getLastOffset() {
+            return lastOffset;
+        }
+
+        public boolean recoverSuccess() {
+            return recoverSuccess;
+        }
+
+        public boolean recoverTotal() {
+            return recoverTotal;
+        }
+    }
+
+    /**
+     * Recover logic
+     * @return the last recover position
+     */
+    public RecoverResult recover() {
+        if (!loadHeader()) {
+            return RecoverResult.newInstance(false, false, -1);
+        }
+        final ByteBuffer byteBuffer = sliceByteBuffer();
+        int recoverPosition = this.header.getHeaderSize();
+        int recoverCnt = 0;
+        int lastLogPosition = recoverPosition;
+        boolean isFileEnd = false;
+        final long start = Utils.monotonicMs();
+        while (true) {
+            byteBuffer.position(recoverPosition);
+            final CheckDataResult checkResult = checkData(byteBuffer);
+            if (checkResult == CheckDataResult.FILE_END) {
+                // File end
+                isFileEnd = true;
+                break;
+            } else if (checkResult == CheckDataResult.CHECK_FAIL) {
+                // Check fail
+                break;
+            } else {
+                // Check success
+                recoverCnt++;
+                lastLogPosition = recoverPosition;
+                recoverPosition += checkResult.size;
+            }
+        }
+        updateAllPosition(recoverPosition);
+        onRecoverDone(lastLogPosition);
+        LOG.info("Recover file {} cost {} millis, recoverPosition:{}, recover logs:{}, lastLogIndex:{}", getFilePath(),
+            Utils.monotonicMs() - start, recoverPosition, recoverCnt, getLastLogIndex());
+        final boolean isRecoverTotal = isFileEnd || (recoverPosition == this.fileSize);
+        return RecoverResult.newInstance(true, isRecoverTotal, recoverPosition);
+    }
+
+    public enum CheckDataResult {
+        CHECK_SUCCESS(1), // If check success, return dataSize
+        CHECK_FAIL(-1), // If check failed, return -1
+        FILE_END(0); // If come to file end, return 0
+
+        private int size;
+
+        CheckDataResult(final int pos) {
+            this.size = pos;
+        }
+
+        public void setSize(final int pos) {
+            this.size = pos;
+        }
+    }
+
+    /**
+     *
+     * @return check result
+     */
+    public abstract CheckDataResult checkData(final ByteBuffer byteBuffer);
+
+    /**
+     * Trigger function when recover done
+     * @param recoverPosition the recover position
+     */
+    public abstract void onRecoverDone(final int recoverPosition);
+
+    /**
+     * Truncate file entries to logIndex
+     * @param logIndex the target logIndex
+     * @param pos the position of this entry, this parameter is needed only if this file is a segmentFile
+     */
+    public abstract int truncate(final long logIndex, final int pos);
+
+    /**
+     * Append data to file end
+     * @param logIndex logEntry index
+     * @param data data array
+     * @return wrote position
+     */
+    protected int doAppend(final long logIndex, final byte[] data) {
+        this.writeLock.lock();
+        try {
+            int wrotePos = getWrotePosition();
+            // First time append , set firstLogIndex to header
+            if (this.header.isBlank()) {
+                this.header.setFirstLogIndex(logIndex);
+                saveHeader();
+                wrotePos = this.header.getHeaderSize();
+            }
+            // Write data and update header
+            final ByteBuffer buffer = sliceByteBuffer();
+            put(buffer, wrotePos, data);
+            setWrotePosition(wrotePos + data.length);
+            this.header.setLastLogIndex(logIndex);
+            return wrotePos;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Flush data to disk
+     * @return The current flushed position
+     */
+    public int flush() {
+        if (hold()) {
+            final int value = getWrotePosition();
+            try {
+                this.mappedByteBuffer.force();
+            } catch (final Throwable e) {
+                LOG.error("Error occurred when force data to disk.", e);
+                throw new RuntimeException(e);
+            }
+            setFlushPosition(value);
+            release();
+        } else {
+            LOG.warn("In flush, hold failed, flush offset = {}.", getFlushedPosition());
+            setFlushPosition(getWrotePosition());
+        }
+        return getFlushedPosition();
+    }
+
+    public boolean shutdown(final long intervalForcibly, final boolean isDestroy) {
+        shutdown(intervalForcibly);
+        if (isCleanupOver()) {
+            try {
+                if (isDestroy) {
+                    return IgniteUtils.deleteIfExists(this.file.toPath());
+                }
+                return true;
+            } catch (final Throwable t) {
+                LOG.error("Close file channel failed, {}", getFilePath(), t);
+                throw new RuntimeException(t);
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean cleanup(final long currentRef) {
+        if (isAvailable()) {
+            return false;
+        }
+        if (isCleanupOver() || !isMapped()) {
+            return true;
+        }
+        unmmap();
+        return true;
+    }
+
+    /**
+     * Load file header
+     */
+    public boolean loadHeader() {
+        final ByteBuffer byteBuffer = sliceByteBuffer();
+        byteBuffer.position(0);
+        return this.header.decode(byteBuffer);
+    }
+
+    /**
+     * Save file header
+     * Dont need to flush , header will be flushed by flushService
+     */
+    public void saveHeader() {
+        // save header to mappedByteBuffer
+        final ByteBuffer header = this.header.encode();
+        final ByteBuffer byteBuffer = sliceByteBuffer();
+        byteBuffer.position(0);
+        byteBuffer.put(header);
+    }
+
+    /**
+     * Fill empty bytes in this file end when this fill has no sufficient free space to store one message
+     */
+    public void fillEmptyBytesInFileEnd() {
+        this.writeLock.lock();
+        try {
+            final int wrotePosition = getWrotePosition();
+            final ByteBuffer byteBuffer = sliceByteBuffer();
+            for (int i = wrotePosition; i < this.fileSize; i++) {
+                byteBuffer.put(i, FILE_END_BYTE);
+            }
+            setWrotePosition(this.fileSize);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Clear data in [startPos, startPos+64).
+     */
+    public void clear(final int startPos) {
+        this.writeLock.lock();
+        try {
+            if (startPos < 0 || startPos > this.fileSize) {
+                return;
+            }
+            final int endPos = Math.min(this.fileSize, startPos + BLANK_HOLE_SIZE);
+            for (int i = startPos; i < endPos; i++) {
+                this.mappedByteBuffer.put(i, (byte) 0);
+            }
+            this.mappedByteBuffer.force();
+            LOG.info("File {} cleared data in [{}, {}].", this.filePath, startPos, endPos);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public void put(final ByteBuffer buffer, final int index, final byte[] data) {
+        for (int i = 0; i < data.length; i++) {
+            buffer.put(index + i, data[i]);
+        }
+    }
+
+    public long getFirstLogIndex() {
+        return this.header.getFirstLogIndex();
+    }
+
+    public long getLastLogIndex() {
+        return this.header.getLastLogIndex();
+    }
+
+    public void setLastLogIndex(final long lastLogIndex) {
+        this.header.setLastLogIndex(lastLogIndex);
+    }
+
+    public void setFileFromOffset(final long fileFromOffset) {
+        this.header.setFileFromOffset(fileFromOffset);
+    }
+
+    public long getFileFromOffset() {
+        return this.header.getFileFromOffset();
+    }
+
+    /**
+     * Returns true when the segment file contains the log index.
+     *
+     * @param logIndex the log index
+     * @return true if the segment file contains the log index, otherwise return false
+     */
+    public boolean contains(final long logIndex) {
+        this.readLock.lock();
+        try {
+            return logIndex >= this.header.getFirstLogIndex() && logIndex <= this.getLastLogIndex();
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public ByteBuffer sliceByteBuffer() {
+        return this.mappedByteBuffer.slice();
+    }
+
+    public void warmupFile() {
+        if (!isMapped()) {
+            return;
+        }
+        // Lock memory
+        if (Platform.isLinux()) {
+            hintLoad();
+        }
+    }
+
+    public void hintLoad() {
+        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+        Pointer pointer = new Pointer(address);
+
+        long beginTime = Utils.monotonicMs();
+        if (Platform.isLinux()) {
+            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
+            LOG.info("madvise(MADV_WILLNEED) {} {} {} ret = {} time consuming = {}", address, this.filePath,
+                this.fileSize, ret, Utils.monotonicMs() - beginTime);
+        }
+    }
+
+    public void hintUnload() {
+        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+        Pointer pointer = new Pointer(address);
+
+        long beginTime = Utils.monotonicMs();
+        if (Platform.isLinux()) {
+            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_DONTNEED);
+            LOG.info("madvise(MADV_DONTNEED) {} {} {} ret = {} time consuming = {}", address, this.filePath,
+                this.fileSize, ret, Utils.monotonicMs() - beginTime);
+        }
+    }
+
+    public void reset() {
+        setWrotePosition(0);
+        setFlushPosition(0);
+        this.header.setFirstLogIndex(FileHeader.BLANK_OFFSET_INDEX);
+        flush();
+    }
+
+    public int getWrotePosition() {
+        return wrotePosition.get();
+    }
+
+    public void setWrotePosition(final int position) {
+        this.wrotePosition.set(position);
+    }
+
+    public int getFlushedPosition() {
+        return flushedPosition.get();
+    }
+
+    public void setFlushPosition(final int position) {
+        this.flushedPosition.set(position);
+    }
+
+    /**
+     * Update  flush / wrote position to pos
+     * @param pos target pos
+     */
+    public void updateAllPosition(final int pos) {
+        setWrotePosition(pos);
+        setFlushPosition(pos);
+    }
+
+    public String getFilePath() {
+        return filePath;
+    }
+
+    public boolean reachesFileEndBy(final int waitToWroteSize) {
+        return getWrotePosition() + waitToWroteSize > getFileSize();
+    }
+
+    public int getFileSize() {
+        return fileSize;
+    }
+
+    public boolean isBlank() {
+        return this.header.isBlank();
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
new file mode 100644
index 0000000000..d68f54e5f4
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * File header:
+ * <pre>
+ *   magic bytes       first log index   file from offset       reserved
+ *   [0x20 0x20]      [... 8 bytes...]   [... 8 bytes...]   [... 8 bytes...]
+ * <pre>
+ */
+public class FileHeader {
+    private static final IgniteLogger LOG = Loggers.forClass(FileHeader.class);
+
+    @SuppressWarnings("unused")
+    private static final long   RESERVED_FLAG      = 0L;
+
+    public static final int     HEADER_SIZE        = 26;
+
+    public static final long    BLANK_OFFSET_INDEX = -99;
+
+    private volatile long       FirstLogIndex      = BLANK_OFFSET_INDEX;
+
+    private long                FileFromOffset     = -1;
+
+    private volatile long       LastLogIndex       = BLANK_OFFSET_INDEX;
+
+    private static final byte   MAGIC              = 0x20;
+
+    public FileHeader() {
+        super();
+    }
+
+    public ByteBuffer encode() {
+        ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE);
+        buffer.put(MAGIC);
+        buffer.put(MAGIC);
+        buffer.putLong(this.FirstLogIndex);
+        buffer.putLong(this.FileFromOffset);
+        buffer.putLong(RESERVED_FLAG);
+        buffer.flip();
+        return buffer;
+    }
+
+    public boolean decode(final ByteBuffer buffer) {
+        if (buffer == null || buffer.remaining() < HEADER_SIZE) {
+            LOG.error("Fail to decode file header, invalid buffer length: {}", buffer == null ? 0 : buffer.remaining());
+            return false;
+        }
+        if (buffer.get() != MAGIC) {
+            return false;
+        }
+        if (buffer.get() != MAGIC) {
+            return false;
+        }
+        this.FirstLogIndex = buffer.getLong();
+        this.FileFromOffset = buffer.getLong();
+        return true;
+    }
+
+    public long getFirstLogIndex() {
+        return FirstLogIndex;
+    }
+
+    public void setFirstLogIndex(final long firstLogIndex) {
+        this.FirstLogIndex = firstLogIndex;
+    }
+
+    public long getFileFromOffset() {
+        return FileFromOffset;
+    }
+
+    public void setFileFromOffset(final long fileFromOffset) {
+        this.FileFromOffset = fileFromOffset;
+    }
+
+    public long getLastLogIndex() {
+        return LastLogIndex;
+    }
+
+    public void setLastLogIndex(final long lastLogIndex) {
+        this.LastLogIndex = lastLogIndex;
+    }
+
+    public int getHeaderSize() {
+        return HEADER_SIZE;
+    }
+
+    public boolean isBlank() {
+        return this.FirstLogIndex == BLANK_OFFSET_INDEX;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
new file mode 100644
index 0000000000..ea784bda31
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.AllocateFileService;
+import org.apache.ignite.raft.jraft.util.Requires;
+
+/**
+ * Uses list to manage AbstractFile(Index,Segment) by logIndex and offset
+ */
+public class FileManager {
+    private static final IgniteLogger LOG = Loggers.forClass(FileManager.class);
+
+    private final String              storePath;
+
+    private final FileType            fileType;
+
+    private final StoreOptions        storeOptions;
+
+    private final int                 fileSize;
+
+    private final AllocateFileService allocateService;
+
+    private final LogStoreFactory     logStoreFactory;
+
+    private final List<AbstractFile>  files     = new ArrayList<>();
+
+    private volatile long             flushedPosition;
+
+    private final ReadWriteLock       lock      = new ReentrantReadWriteLock();
+    private final Lock                readLock  = lock.readLock();
+    private final Lock                writeLock = lock.writeLock();
+
+    public FileManager(final FileType fileType, final int fileSize, final String storePath,
+                       final LogStoreFactory logStoreFactory, final AllocateFileService allocateService) {
+        this.storePath = storePath;
+        this.fileType = fileType;
+        this.allocateService = allocateService;
+        this.storeOptions = logStoreFactory.getStoreOptions();
+        this.logStoreFactory = logStoreFactory;
+        this.fileSize = fileSize;
+    }
+
+    public static FileManagerBuilder newBuilder() {
+        return new FileManagerBuilder();
+    }
+
+    public static class FileManagerBuilder {
+        private FileType            fileType;
+
+        private String              storePath;
+
+        private Integer             fileSize;
+
+        private LogStoreFactory     logStoreFactory;
+
+        private AllocateFileService allocateService;
+
+        public FileManagerBuilder storePath(final String storePath) {
+            this.storePath = storePath;
+            return this;
+        }
+
+        public FileManagerBuilder fileType(final FileType fileType) {
+            this.fileType = fileType;
+            return this;
+        }
+
+        public FileManagerBuilder logStoreFactory(final LogStoreFactory logStoreFactory) {
+            this.logStoreFactory = logStoreFactory;
+            return this;
+        }
+
+        public FileManagerBuilder fileSize(final Integer abstractFileSize) {
+            this.fileSize = abstractFileSize;
+            return this;
+        }
+
+        public FileManagerBuilder allocateService(final AllocateFileService allocateService) {
+            this.allocateService = allocateService;
+            return this;
+        }
+
+        public FileManager build() {
+            Requires.requireNonNull(this.storePath, "storePath");
+            Requires.requireNonNull(this.fileType, "fileType");
+            Requires.requireNonNull(this.logStoreFactory, "logStoreFactory");
+            Requires.requireNonNull(this.fileSize, "fileSize");
+            Requires.requireNonNull(this.allocateService, "allocateService");
+            return new FileManager(fileType, fileSize, storePath, logStoreFactory, allocateService);
+        }
+    }
+
+    /**
+     * Load abstract files that existed in this store path
+     * This function will be called when db recover
+     * @return the existed files list
+     */
+    public List<AbstractFile> loadExistedFiles() {
+        final File dir = new File(this.storePath);
+        if(!dir.exists()) {
+            dir.mkdirs();
+        }
+        final File[] files = dir.listFiles();
+        if (files == null || files.length == 0) {
+            return Collections.emptyList();
+        }
+        Arrays.sort(files, Comparator.comparing(this::getFileSequenceFromFileName));
+        final List<AbstractFile> blankFiles = new ArrayList<>(files.length);
+        long nextFileSequence = 0;
+        for (final File file : files) {
+            AbstractFile abstractFile;
+            if ((abstractFile = checkFileCorrectnessAndMmap(file)) != null) {
+                if (abstractFile.loadHeader() && !abstractFile.isBlank()) {
+                    this.files.add(abstractFile);
+                } else {
+                    abstractFile.reset();
+                    blankFiles.add(abstractFile);
+                }
+                nextFileSequence = Math.max(nextFileSequence, getFileSequenceFromFileName(file) + 1);
+            }
+        }
+        this.allocateService.setNextFileSequence(nextFileSequence);
+        // Add blank files to allocator so that we can reuse them
+        this.allocateService.addBlankAbstractFiles(blankFiles);
+        return this.files;
+    }
+
+    /**
+     * Check file's correctness of name and length
+     * @return mmap file
+     */
+    private AbstractFile checkFileCorrectnessAndMmap(final File file) {
+        if (!file.exists() || !file.getName().endsWith(this.fileType.getFileSuffix()))
+            { return null; }
+        AbstractFile abstractFile = null;
+        final long fileLength = file.length();
+        if (fileLength == this.fileSize) {
+            abstractFile = this.logStoreFactory.newFile(this.fileType, file.getPath());
+        }
+        return abstractFile;
+    }
+
+    public AbstractFile[] copyFiles() {
+        this.readLock.lock();
+        try {
+            return this.files.toArray(new AbstractFile[] {});
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public AbstractFile getLastFile(final long logIndex, final int waitToWroteSize, final boolean createIfNecessary) {
+        AbstractFile lastFile = null;
+        while (true) {
+            int fileCount = 0;
+            this.readLock.lock();
+            try {
+                if (!this.files.isEmpty()) {
+                    fileCount = this.files.size();
+                    final AbstractFile file = this.files.get(fileCount - 1);
+                    if (waitToWroteSize <= 0 || !file.reachesFileEndBy(waitToWroteSize)) {
+                        lastFile = file;
+                    } else if (file.reachesFileEndBy(waitToWroteSize)) {
+                        // Reach file end , need to fill blank bytes in file end
+                        file.fillEmptyBytesInFileEnd();
+                    }
+                }
+            } finally {
+                this.readLock.unlock();
+            }
+            // Try to get a new file
+            if (lastFile == null && createIfNecessary) {
+                this.writeLock.lock();
+                try {
+                    if (this.files.size() != fileCount) {
+                        // That means already create a new file , just continue and try again
+                        continue;
+                    }
+                    lastFile = this.allocateService.takeEmptyFile();
+                    if (lastFile != null) {
+                        final long newFileOffset = (long) this.files.size() * (long) this.fileSize;
+                        lastFile.setFileFromOffset(newFileOffset);
+                        this.files.add(lastFile);
+                        this.swapOutFilesIfNecessary();
+                        return lastFile;
+                    } else {
+                        continue;
+                    }
+                } catch (final Exception e) {
+                    LOG.error("Error on create new abstract file , current logIndex:{}", logIndex);
+                } finally {
+                    this.writeLock.unlock();
+                }
+            }
+            return lastFile;
+        }
+    }
+
+    public void swapOutFilesIfNecessary() {
+        this.readLock.lock();
+        try {
+            if (this.files.size() <= this.storeOptions.getKeepInMemoryFileCount()) {
+                return;
+            }
+            int filesInMemoryCount = this.allocateService.getAllocatedFileCount();
+            int swappedOutCount = 0;
+            final int lastIndex = this.files.size() - 1;
+            long lastSwappedOutPosition = 0;
+            for (int i = lastIndex; i >= 0; i--) {
+                final AbstractFile abstractFile = this.files.get(i);
+                if (abstractFile.isMapped()) {
+                    filesInMemoryCount++;
+                    if (filesInMemoryCount >= this.storeOptions.getKeepInMemoryFileCount() && i != lastIndex) {
+                        abstractFile.unmmap();
+                        swappedOutCount++;
+                        if (lastSwappedOutPosition == 0) {
+                            lastSwappedOutPosition = abstractFile.getFileFromOffset() + abstractFile.getFileSize();
+                        }
+                    }
+                }
+            }
+            // Because lastSwappedOutPosition means all the data had been flushed before lastSwappedOutPosition,
+            // so we should update flush position
+            if (getFlushedPosition() < lastSwappedOutPosition) {
+                setFlushedPosition(lastSwappedOutPosition);
+            }
+            LOG.info("Swapped out {} abstract files", swappedOutCount);
+        } catch (final Exception e) {
+            LOG.error("Error on swap out files", e);
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * @return the file that contains this logIndex
+     */
+    public AbstractFile findFileByLogIndex(final long logIndex, final boolean returnFirstIfNotFound) {
+        this.readLock.lock();
+        try {
+            if (this.files.isEmpty()) {
+                return null;
+            }
+            if (this.files.size() == 1) {
+                return getFirstFile();
+            }
+            int lo = 0, hi = this.files.size() - 1;
+            while (lo <= hi) {
+                final int mid = (lo + hi) >>> 1;
+                final AbstractFile file = this.files.get(mid);
+                if (file.getLastLogIndex() < logIndex) {
+                    lo = mid + 1;
+                } else if (file.getFirstLogIndex() > logIndex) {
+                    hi = mid - 1;
+                } else {
+                    return this.files.get(mid);
+                }
+            }
+            if (returnFirstIfNotFound) {
+                return getFirstFile();
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+        return null;
+    }
+
+    /*
+     * @param logIndex begin index
+     * @return the files that contain of after this logIndex
+     */
+    public AbstractFile[] findFileFromLogIndex(final long logIndex) {
+        this.readLock.lock();
+        try {
+            for (int i = 0; i < this.files.size(); i++) {
+                AbstractFile file = this.files.get(i);
+                if (file.getFirstLogIndex() <= logIndex && logIndex <= file.getLastLogIndex()) {
+                    final AbstractFile[] result = new AbstractFile[this.files.size() - i + 1];
+                    for (int j = i; j < this.files.size(); j++) {
+                        result[j - i] = this.files.get(j);
+                    }
+                    return result;
+                }
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+        return new AbstractFile[0];
+    }
+
+    /**
+     * @return the file that contains this offset
+     */
+    public AbstractFile findFileByOffset(final long offset, final boolean returnFirstIfNotFound) {
+        this.readLock.lock();
+        try {
+            if (this.files.size() == 0)
+                { return null; }
+            final AbstractFile firstAbstractFile = getFirstFile();
+            final AbstractFile lastAbstractFile = getLastFile();
+            if (firstAbstractFile != null && lastAbstractFile != null) {
+                if (offset < firstAbstractFile.getFileFromOffset()
+                    || offset >= lastAbstractFile.getFileFromOffset() + this.fileSize) {
+                    LOG.warn(
+                        "Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, fileSize: {}, fileNums: {}",
+                        offset, firstAbstractFile.getFileFromOffset(), lastAbstractFile.getFileFromOffset()
+                                                                       + this.fileSize, this.fileSize,
+                        this.files.size());
+                } else {
+                    // Locate the index
+                    final int index = (int) ((offset / this.fileSize) - (firstAbstractFile.getFileFromOffset() / this.fileSize));
+                    AbstractFile targetFile;
+                    targetFile = this.files.get(index);
+
+                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
+                        && offset < targetFile.getFileFromOffset() + this.fileSize) {
+                        return targetFile;
+                    }
+
+                    // If pre not found , then traverse to find target file
+                    for (final AbstractFile abstractFile : this.files) {
+                        if (offset >= abstractFile.getFileFromOffset()
+                            && offset < abstractFile.getFileFromOffset() + this.fileSize) {
+                            return abstractFile;
+                        }
+                    }
+                }
+                if (returnFirstIfNotFound) {
+                    return firstAbstractFile;
+                }
+            }
+        } catch (final Exception e) {
+            LOG.error("Error on find abstractFile by offset :{}, file type:{}", offset, this.fileType.getFileName(), e);
+        } finally {
+            this.readLock.unlock();
+        }
+        return null;
+    }
+
+    /**
+     * Flush from flushPosition
+     * @return true if flush success
+     */
+    public boolean flush() {
+        final long flushWhere = getFlushedPosition();
+        final AbstractFile abstractFile = findFileByOffset(flushWhere, flushWhere == 0);
+        if (abstractFile != null) {
+            final int flushOffset = abstractFile.flush();
+            setFlushedPosition(abstractFile.getFileFromOffset() + flushOffset);
+            return getFlushedPosition() != flushWhere;
+        }
+        return false;
+    }
+
+    public AbstractFile getLastFile() {
+        return this.files.get(this.files.size() - 1);
+    }
+
+    public AbstractFile getFirstFile() {
+        if (!this.files.isEmpty()) {
+            return this.files.get(0);
+        }
+        return null;
+    }
+
+    /**
+     * Truncate files suffix by offset
+     * Only be called when recover
+     */
+    public void truncateSuffixByOffset(final long offset) {
+        this.readLock.lock();
+        final ArrayList<AbstractFile> willRemoveFiles = new ArrayList<>();
+        try {
+            for (final AbstractFile file : this.files) {
+                final long tailOffset = file.getFileFromOffset() + this.fileSize;
+                if (tailOffset > offset) {
+                    if (offset >= file.getFileFromOffset()) {
+                        final int truncatePosition = (int) (offset % this.fileSize);
+                        file.setWrotePosition(truncatePosition);
+                        file.setFlushPosition(truncatePosition);
+                    } else {
+                        willRemoveFiles.add(file);
+                    }
+                }
+            }
+        } finally {
+            this.readLock.unlock();
+            for (final AbstractFile file : willRemoveFiles) {
+                if (file != null) {
+                    file.shutdown(1000, true);
+                }
+            }
+            deleteFiles(willRemoveFiles);
+        }
+    }
+
+    /**
+     * If file's lastLogIndex < first_index_kept , it will be destroyed
+     * But if file's firstLogIndex < firstIndexKept < file's  lastLogIndex, it will be retained
+     */
+    public boolean truncatePrefix(final long firstIndexKept) {
+        this.readLock.lock();
+        final List<AbstractFile> willRemoveFiles = new ArrayList<>();
+        try {
+            for (final AbstractFile abstractFile : this.files) {
+                final long lastLogIndex = abstractFile.getLastLogIndex();
+                if (lastLogIndex < firstIndexKept) {
+                    willRemoveFiles.addAll(this.files);
+                }
+            }
+            return true;
+        } finally {
+            this.readLock.unlock();
+            for (final AbstractFile file : willRemoveFiles) {
+                if (file != null) {
+                    file.shutdown(1000, true);
+                }
+            }
+            deleteFiles(willRemoveFiles);
+        }
+    }
+
+    /**
+     * Truncate files suffix by lastIndexKept , (last_index_kept, last_log_index]
+     *  will be discarded.
+     */
+    public boolean truncateSuffix(final long lastIndexKept, final int pos) {
+        if (getLastLogIndex() <= lastIndexKept) {
+            return true;
+        }
+        this.readLock.lock();
+        final List<AbstractFile> willRemoveFiles = new ArrayList<>();
+        try {
+            long retainPosition = 0;
+            for (final AbstractFile abstractFile : this.files) {
+                final long firstLogIndex = abstractFile.getFirstLogIndex();
+                final long lastLogIndex = abstractFile.getLastLogIndex();
+                if (lastLogIndex > lastIndexKept) {
+                    if (lastIndexKept >= firstLogIndex) {
+                        // Truncate
+                        final int lastPosition = abstractFile.truncate(lastIndexKept + 1, pos);
+                        retainPosition += lastPosition;
+                    } else {
+                        // Remove
+                        willRemoveFiles.add(abstractFile);
+                    }
+                } else {
+                    retainPosition += this.fileSize;
+                }
+            }
+            // Update flush position
+            if (retainPosition < getFlushedPosition())
+                { setFlushedPosition(retainPosition); }
+            return true;
+        } finally {
+            this.readLock.unlock();
+            for (final AbstractFile file : willRemoveFiles) {
+                if (file != null) {
+                    file.shutdown(1000, true);
+                }
+            }
+            deleteFiles(willRemoveFiles);
+        }
+    }
+
+    /**
+     * Clear all files
+     */
+    public boolean reset(final long nextLogIndex) {
+        List<AbstractFile> destroyedFiles = new ArrayList<>();
+        this.writeLock.lock();
+        try {
+            destroyedFiles.addAll(this.files);
+            this.files.clear();
+            setFlushedPosition(0);
+            LOG.info("Destroyed all abstractFiles in path {} by resetting.", this.storePath);
+        } finally {
+            this.writeLock.unlock();
+            for (final AbstractFile file : destroyedFiles) {
+                file.shutdown(1000, true);
+            }
+        }
+        return true;
+    }
+
+    private void deleteFiles(final List<AbstractFile> files) {
+        this.writeLock.lock();
+        try {
+            if (!files.isEmpty()) {
+                files.removeIf(file -> !this.files.contains(file));
+                this.files.removeAll(files);
+            }
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public long getFirstLogIndex() {
+        this.readLock.lock();
+        try {
+            if (!this.files.isEmpty()) {
+                return this.files.get(0).getFirstLogIndex();
+            }
+            return -1;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public long getLastLogIndex() {
+        this.readLock.lock();
+        try {
+            if (!this.files.isEmpty()) {
+                final AbstractFile lastFile = getLastFile();
+                return lastFile.getLastLogIndex();
+            }
+            return -1;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    public void shutdown() {
+        for (final AbstractFile file : this.files) {
+            if (file != null) {
+                file.shutdown(5000, false);
+            }
+        }
+    }
+
+    public long getFlushedPosition() {
+        return flushedPosition;
+    }
+
+    public synchronized void setFlushedPosition(final long flushedPosition) {
+        this.flushedPosition = flushedPosition;
+    }
+
+    public long getFileSequenceFromFileName(final File file) {
+        final String name = file.getName();
+        if (name.endsWith(this.fileType.getFileSuffix())) {
+            int idx = name.indexOf(this.fileType.getFileSuffix());
+            return Long.parseLong(name.substring(0, idx));
+        }
+        return 0;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileType.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileType.java
new file mode 100644
index 0000000000..5d1fbc6946
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+/**
+ * Specifies the type of file
+ */
+public enum FileType {
+    FILE_INDEX("indexFile", ".i"), // index file
+    FILE_SEGMENT("segmentFile", ".s"), // segment file
+    FILE_CONFIGURATION("confFile", ".c"); // configuration file
+
+    private final String fileName;
+    private final String fileSuffix;
+
+    FileType(String fileName, String fileSuffix) {
+        this.fileName = fileName;
+        this.fileSuffix = fileSuffix;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public String getFileSuffix() {
+        return fileSuffix;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/AbortFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/AbortFile.java
new file mode 100644
index 0000000000..d3e00a98a9
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/AbortFile.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Abort file
+ */
+public class AbortFile {
+
+    private final String path;
+
+    public String getPath() {
+        return this.path;
+    }
+
+    public AbortFile(final String path) {
+        super();
+        this.path = path;
+    }
+
+    public boolean create() throws IOException {
+        final File file = new File(this.path);
+        if (file.createNewFile()) {
+            writeDate();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private void writeDate() throws IOException {
+        final File file = new File(this.path);
+        try (final FileWriter writer = new FileWriter(file, false)) {
+            writer.write(new Date().toGMTString());
+            writer.write(System.lineSeparator());
+        }
+    }
+
+    public void touch() throws IOException {
+        writeDate();
+    }
+
+    public boolean exists() {
+        final File file = new File(this.path);
+        return file.isFile() && file.exists();
+    }
+
+    public boolean destroy() {
+        return new File(this.path) //
+            .delete();
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/Checkpoint.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/Checkpoint.java
new file mode 100644
index 0000000000..bf89b3ffe8
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/Checkpoint.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import java.io.IOException;
+
+import java.nio.file.Path;
+
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.io.MessageFile;
+
+/**
+ * Abstract checkpoint file
+ */
+public abstract class Checkpoint {
+
+    private final String path;
+    private final RaftMessagesFactory raftMessagesFactory;
+
+    public Checkpoint(final String path, RaftOptions raftOptions) {
+        super();
+        this.path = path;
+        this.raftMessagesFactory = raftOptions.getRaftMessagesFactory();
+    }
+
+    /**
+     * Encode metadata
+     */
+    public abstract byte[] encode();
+
+    /**
+     * Decode file data
+     */
+    public abstract boolean decode(final byte[] bs);
+
+    public synchronized boolean save() throws IOException {
+        MessageFile file = new MessageFile(this.path);
+        final byte[] data = this.encode();
+        final LocalFileMeta meta = raftMessagesFactory.localFileMeta() //
+            .userMeta(data) //
+            .build();
+        return file.save(meta, true);
+    }
+
+    public void load() throws IOException {
+        MessageFile file = new MessageFile(this.path);
+        final LocalFileMeta meta = file.load();
+        if (meta != null) {
+            final byte[] data = meta.userMeta();
+            decode(data);
+        }
+    }
+
+    public void destroy() {
+        IgniteUtils.deleteIfExists(Path.of(this.path));
+    }
+
+    public String getPath() {
+        return this.path;
+    }
+
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FirstLogIndexCheckpoint.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FirstLogIndexCheckpoint.java
new file mode 100644
index 0000000000..cd9b532bc4
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FirstLogIndexCheckpoint.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.util.Bits;
+
+/**
+ * This checkpoint is used for save firstLogIndex
+ */
+public class FirstLogIndexCheckpoint extends Checkpoint {
+
+    // LogStorage first log index
+    public long firstLogIndex = -1;
+
+    public FirstLogIndexCheckpoint(final String path, RaftOptions raftOptions) {
+        super(path, raftOptions);
+    }
+
+    /**
+     * firstLogIndex (8 bytes)
+     */
+    public byte[] encode() {
+        byte[] bs = new byte[8];
+        Bits.putLong(bs, 0, this.firstLogIndex);
+        return bs;
+    }
+
+    public boolean decode(final byte[] bs) {
+        if (bs.length < 8) {
+            return false;
+        }
+        this.firstLogIndex = Bits.getLong(bs, 0);
+        return this.firstLogIndex >= 0;
+    }
+
+    public void setFirstLogIndex(final long firstLogIndex) {
+        this.firstLogIndex = firstLogIndex;
+    }
+
+    public void reset() {
+        this.firstLogIndex = -1;
+    }
+
+    public boolean isInit() {
+        return this.firstLogIndex >= 0;
+    }
+
+    @Override
+    public String toString() {
+        return "FirstLogIndexCheckpoint{" + "firstLogIndex=" + firstLogIndex + '}';
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FlushStatusCheckpoint.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FlushStatusCheckpoint.java
new file mode 100644
index 0000000000..0c1736be07
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FlushStatusCheckpoint.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.util.AsciiStringUtil;
+import org.apache.ignite.raft.jraft.util.Bits;
+
+/**
+ * This checkpoint is used for save flushPosition and last LogIndex
+ */
+public class FlushStatusCheckpoint extends Checkpoint {
+
+    // Current file name
+    public String fileName;
+    // Current flush position.
+    public long   flushPosition;
+    // Last logIndex
+    public long   lastLogIndex;
+
+    public FlushStatusCheckpoint(final String path, RaftOptions raftOptions) {
+        super(path, raftOptions);
+    }
+
+    /**
+     * flushPosition (8 bytes) + lastLogIndex (8 bytes) + path(4 bytes len + string bytes)
+     */
+    public byte[] encode() {
+        byte[] ps = AsciiStringUtil.unsafeEncode(this.fileName);
+        byte[] bs = new byte[20 + ps.length];
+        Bits.putLong(bs, 0, this.flushPosition);
+        Bits.putLong(bs, 8, this.lastLogIndex);
+        Bits.putInt(bs, 16, ps.length);
+        System.arraycopy(ps, 0, bs, 20, ps.length);
+        return bs;
+    }
+
+    public boolean decode(final byte[] bs) {
+        if (bs.length < 20) {
+            return false;
+        }
+        this.flushPosition = Bits.getLong(bs, 0);
+        this.lastLogIndex = Bits.getLong(bs, 8);
+        int len = Bits.getInt(bs, 16);
+        this.fileName = AsciiStringUtil.unsafeDecode(bs, 20, len);
+        return this.flushPosition >= 0 && this.lastLogIndex >= 0 && !this.fileName.isEmpty();
+    }
+
+    public void setFileName(final String fileName) {
+        this.fileName = fileName;
+    }
+
+    public void setFlushPosition(final long flushPosition) {
+        this.flushPosition = flushPosition;
+    }
+
+    public void setLastLogIndex(final long lastLogIndex) {
+        this.lastLogIndex = lastLogIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "FlushStatusCheckpoint{" + "segFilename='" + fileName + '\'' + ", flushPosition=" + flushPosition
+               + ", lastLogIndex=" + lastLogIndex + '}';
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
new file mode 100644
index 0000000000..d2399afbfc
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.index;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+
+/**
+ *  * File header:
+ *  * <pre>
+ *  *   magic bytes       first log index   file from offset       reserved
+ *  *   [0x20 0x20]      [... 8 bytes...]   [... 8 bytes...]   [... 8 bytes...]
+ *  * </pre>
+ *  *
+ *  * Every record format is:
+ *  * <pre>
+ *  *    Magic byte     index type     offset        position
+ *  *   [0x57]         [1 byte]      [4 bytes]     [4 bytes]
+ *  *</pre>
+ *  *
+ * The implementation of offset index
+ */
+public class IndexFile extends AbstractFile {
+    private static final IgniteLogger LOG = Loggers.forClass(IndexFile.class);
+
+    /**
+     * Magic bytes for data buffer.
+     */
+    public static final byte[]  RECORD_MAGIC_BYTES      = new byte[] { (byte) 0x57 };
+
+    public static final int     RECORD_MAGIC_BYTES_SIZE = RECORD_MAGIC_BYTES.length;
+
+    public IndexFile(final String filePath, final int fileSize) {
+        super(filePath, fileSize, true);
+    }
+
+    /**
+     * The offset entry of Index, including (1 byte magic, 4 bytes offset, 4 bytes position, 1 byte logType)
+     */
+    public static class IndexEntry {
+        // Log index
+        private long            logIndex;
+        // Relative offset
+        private int             offset;
+        // Physical position
+        private int             position;
+        // LogType
+        private byte            logType;
+
+        // Index entry size
+        public static final int INDEX_SIZE = 10;
+
+        public IndexEntry(final long logIndex, final int position, final byte logType) {
+            this(logIndex, 0, position, logType);
+        }
+
+        public IndexEntry(final int offset, final int position) {
+            this(0, offset, position, IndexType.IndexSegment.getType());
+        }
+
+        public IndexEntry(final long logIndex, final int offset, final int position, final byte logType) {
+            this.logIndex = logIndex;
+            this.offset = offset;
+            this.position = position;
+            this.logType = logType;
+        }
+
+        public static IndexEntry newInstance() {
+            return new IndexEntry(-1, -1);
+        }
+
+        public void setLogIndex(final long logIndex) {
+            this.logIndex = logIndex;
+        }
+
+        public long getLogIndex() {
+            return logIndex;
+        }
+
+        public int getOffset() {
+            return offset;
+        }
+
+        public int getPosition() {
+            return position;
+        }
+
+        public byte getLogType() {
+            return logType;
+        }
+
+        public boolean decode(final ByteBuffer buffer) {
+            if (buffer == null || buffer.remaining() < INDEX_SIZE) {
+                LOG.error("Fail to decode index entry , invalid buffer length: {}",
+                    buffer == null ? 0 : buffer.remaining());
+                return false;
+            }
+            final byte[] magic = new byte[1];
+            buffer.get(magic);
+            if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+                LOG.error("Fail to decode index entry, invalid buffer magic");
+                return false;
+            }
+            this.logType = buffer.get();
+            this.offset = buffer.getInt();
+            this.position = buffer.getInt();
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "IndexEntry{" + "logIndex=" + logIndex + ", offset=" + offset + ", position=" + position
+                   + ", logType=" + logType + '}';
+        }
+    }
+
+    /**
+     * Write the index entry
+     *  @param logIndex the log index
+     * @param position the physical position
+     * @param logType [1] means logEntry , [2] means confEntry
+     */
+    public int appendIndex(final long logIndex, final int position, final byte logType) {
+        this.writeLock.lock();
+        try {
+            assert (logIndex > getLastLogIndex());
+            final byte[] writeData = encodeData(toRelativeOffset(logIndex), position, logType);
+            return doAppend(logIndex, writeData);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private byte[] encodeData(final int offset, final int position, final byte logType) {
+        final ByteBuffer buffer = ByteBuffer.allocate(getIndexSize());
+        // Magics
+        buffer.put(RECORD_MAGIC_BYTES);
+        // logType (segmentLog or conf)
+        buffer.put(logType);
+        // offset from FirstLogIndex
+        buffer.putInt(offset);
+        // phyPosition in segmentFile
+        buffer.putInt(position);
+        buffer.flip();
+        return buffer.array();
+    }
+
+    /**
+     * Find the index entry
+     * @param  logIndex the target log index
+     * @return a pair holding this offset and its physical file position.
+     */
+    public IndexEntry lookupIndex(final long logIndex) {
+        mapInIfNecessary();
+        this.readLock.lock();
+        try {
+            final ByteBuffer byteBuffer = sliceByteBuffer();
+            final int slot = (int) (logIndex - this.header.getFirstLogIndex());
+            if (slot < 0) {
+                return IndexEntry.newInstance();
+            } else {
+                return parseEntry(byteBuffer, slot);
+            }
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public CheckDataResult checkData(final ByteBuffer buffer) {
+        if (buffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
+            return CheckDataResult.CHECK_FAIL;
+        }
+        // Check magic
+        final byte[] magic = new byte[RECORD_MAGIC_BYTES_SIZE];
+        buffer.get(magic);
+        if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+            return CheckDataResult.FILE_END;
+        }
+        // Check index type
+        final byte indexType = buffer.get();
+        if (indexType != IndexType.IndexSegment.getType() && indexType != IndexType.IndexConf.getType()) {
+            return CheckDataResult.CHECK_FAIL;
+        }
+        if (buffer.remaining() < getIndexSize() - RECORD_MAGIC_BYTES_SIZE - 1) {
+            return CheckDataResult.CHECK_FAIL;
+        }
+        final CheckDataResult result = CheckDataResult.CHECK_SUCCESS;
+        result.setSize(getIndexSize());
+        return result;
+    }
+
+    @Override
+    public void onRecoverDone(final int recoverPosition) {
+        final int indexNum = (recoverPosition - this.header.getHeaderSize()) / IndexEntry.INDEX_SIZE;
+        this.header.setLastLogIndex(this.header.getFirstLogIndex() + indexNum);
+    }
+
+    @Override
+    public int truncate(final long logIndex, final int pos) {
+        this.writeLock.lock();
+        try {
+            if (logIndex < this.header.getFirstLogIndex() || logIndex > this.header.getLastLogIndex()) {
+                return 0;
+            }
+            final int slot = (int) (logIndex - this.header.getFirstLogIndex());
+            return truncateToSlot(slot);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Truncate to a known number of slot.
+     */
+    public int truncateToSlot(final int slot) {
+        this.writeLock.lock();
+        try {
+            this.header.setLastLogIndex(this.header.getFirstLogIndex() + slot - 1);
+            final int lastPos = this.header.getHeaderSize() + slot * getIndexSize();
+            updateAllPosition(lastPos);
+            clear(getWrotePosition());
+            return lastPos;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    /**
+     * Parse an index entry from this index file
+     */
+    private IndexEntry parseEntry(final ByteBuffer buffer, final int n) {
+        final int pos = this.header.getHeaderSize() + n * getIndexSize();
+        buffer.position(pos);
+        final IndexEntry indexEntry = IndexEntry.newInstance();
+        indexEntry.decode(buffer);
+        return indexEntry;
+    }
+
+    /**
+     * Return the relative offset
+     */
+    private int toRelativeOffset(final long offset) {
+        if (this.header.isBlank()) {
+            return 0;
+        } else {
+            return (int) (offset - this.header.getFirstLogIndex());
+        }
+    }
+
+    /**
+     * @return the IndexEntry's phyPosition in this IndexFile
+     */
+    public int calculateIndexPos(final long logIndex) {
+        return (int) (this.header.getHeaderSize() + (logIndex - getFirstLogIndex() + 1) * getIndexSize());
+    }
+
+    public int getIndexSize() {
+        return IndexEntry.INDEX_SIZE;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexType.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexType.java
new file mode 100644
index 0000000000..96fd2038be
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.index;
+
+public enum IndexType {
+
+    IndexSegment((byte) 1), // Segment log' s index
+    IndexConf((byte) 2); // Conf log's index
+
+    private final byte indexType;
+
+    IndexType(final byte indexType) {
+        this.indexType = indexType;
+    }
+
+    public byte getType() {
+        return indexType;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
new file mode 100644
index 0000000000..ebc675cc1a
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.segment;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+
+/**
+ *  * File header:
+ *  * <pre>
+ *  *   magic bytes       first log index   file from offset       reserved
+ *  *   [0x20 0x20]      [... 8 bytes...]   [... 8 bytes...]   [... 8 bytes...]
+ *  * <pre>
+ *
+ *  * Every record format is:
+ *  * <pre>
+ *   Magic bytes     data length   data
+ *   [0x57, 0x8A]    [4 bytes]     [bytes]
+ *  *</pre>
+ *  *
+ */
+public class SegmentFile extends AbstractFile {
+    private static final IgniteLogger LOG = Loggers.forClass(SegmentFile.class);
+
+    /**
+     * Magic bytes for data buffer.
+     */
+    public static final byte[]  RECORD_MAGIC_BYTES      = new byte[] { (byte) 0x57, (byte) 0x8A };
+
+    public static final int     RECORD_MAGIC_BYTES_SIZE = RECORD_MAGIC_BYTES.length;
+
+    // 4 Bytes for written data length
+    private static final int    RECORD_DATA_LENGTH_SIZE = 4;
+
+    public SegmentFile(final String filePath, final int fileSize) {
+        super(filePath, fileSize, true);
+    }
+
+    /**
+     *
+     * Write the data and return it's wrote position.
+     * @param logIndex the log index
+     * @param data     data to write
+     * @return the wrote position
+     */
+    public int appendData(final long logIndex, final byte[] data) {
+        this.writeLock.lock();
+        try {
+            assert (logIndex > getLastLogIndex());
+            final byte[] writeData = encodeData(data);
+            return doAppend(logIndex, writeData);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private byte[] encodeData(final byte[] data) {
+        ByteBuffer buffer = ByteBuffer.allocate(getWriteBytes(data));
+        buffer.put(RECORD_MAGIC_BYTES);
+        buffer.putInt(data.length);
+        buffer.put(data);
+        buffer.flip();
+        return buffer.array();
+    }
+
+    /**
+     * Read data from the position.
+     *
+     * @param logIndex the log index
+     * @param pos      the position to read
+     * @return read data
+     */
+    public byte[] lookupData(final long logIndex, final int pos) {
+        assert (pos >= this.header.getHeaderSize());
+        mapInIfNecessary();
+        this.readLock.lock();
+        try {
+            if (logIndex < this.header.getFirstLogIndex() || logIndex > this.getLastLogIndex()) {
+                LOG.warn(
+                    "Try to read data from segment file {} out of range, logIndex={}, readPos={}, firstLogIndex={}, lastLogIndex={}.",
+                    getFilePath(), logIndex, pos, this.header.getFirstLogIndex(), getLastLogIndex());
+                return null;
+            }
+            if (pos > getFlushedPosition()) {
+                LOG.warn(
+                    "Try to read data from segment file {} out of comitted position, logIndex={}, readPos={}, wrotePos={}, flushPos={}.",
+                    getFilePath(), logIndex, pos, getWrotePosition(), getFlushedPosition());
+                return null;
+            }
+            return lookupData(pos);
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    /**
+     * Read data from the position
+     * @param pos      the position to read
+     * @return read data
+     */
+    public byte[] lookupData(final int pos) {
+        assert (pos >= this.header.getHeaderSize());
+        mapInIfNecessary();
+        this.readLock.lock();
+        try {
+            final ByteBuffer readBuffer = sliceByteBuffer();
+            readBuffer.position(pos);
+            if (readBuffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
+                return null;
+            }
+            final byte[] magic = new byte[RECORD_MAGIC_BYTES_SIZE];
+            readBuffer.get(magic);
+            if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+                return null;
+            }
+            final int dataLen = readBuffer.getInt();
+            if (dataLen <= 0) {
+                return null;
+            }
+            final byte[] data = new byte[dataLen];
+            readBuffer.get(data);
+            return data;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public CheckDataResult checkData(final ByteBuffer buffer) {
+        if (buffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
+            return CheckDataResult.CHECK_FAIL;
+        }
+        // Check magic
+        final byte[] magic = new byte[RECORD_MAGIC_BYTES_SIZE];
+        buffer.get(magic);
+        if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+            return CheckDataResult.FILE_END;
+        }
+        // Check len
+        if (buffer.remaining() < RECORD_DATA_LENGTH_SIZE) {
+            return CheckDataResult.CHECK_FAIL;
+        }
+        final int dataLen = buffer.getInt();
+        if (buffer.remaining() < dataLen) {
+            return CheckDataResult.CHECK_FAIL;
+        }
+        final CheckDataResult result = CheckDataResult.CHECK_SUCCESS;
+        result.setSize(RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + dataLen);
+        return result;
+    }
+
+    @Override
+    public void onRecoverDone(final int recoverPosition) {
+        // Since the logs index in the segmentFile are discontinuous, we should set LastLogIndex by reading and deSerializing last entry log
+        final ByteBuffer buffer = sliceByteBuffer();
+        buffer.position(recoverPosition);
+        final byte[] data = lookupData(recoverPosition);
+        if (data != null) {
+            final LogEntry lastEntry = LogEntryV1CodecFactory.getInstance().decoder().decode(data);
+            if (lastEntry != null) {
+                setLastLogIndex(lastEntry.getId().getIndex());
+            }
+        }
+    }
+
+    @Override
+    public int truncate(final long logIndex, final int pos) {
+        this.writeLock.lock();
+        try {
+            if (logIndex < this.header.getFirstLogIndex() || logIndex > this.header.getLastLogIndex()) {
+                return 0;
+            }
+            if (pos < 0) {
+                return getWrotePosition();
+            }
+            updateAllPosition(pos);
+            clear(getWrotePosition());
+            this.header.setLastLogIndex(logIndex - 1);
+            return pos;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    public static int getWriteBytes(final byte[] data) {
+        return RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + data.length;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/AllocateFileService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/AllocateFileService.java
new file mode 100644
index 0000000000..7bd491ac84
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/AllocateFileService.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.service;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.util.concurrent.ShutdownAbleThread;
+import org.apache.ignite.raft.jraft.util.ArrayDeque;
+import org.apache.ignite.raft.jraft.util.OnlyForTest;
+
+/**
+ * Pre allocate abstractFile service
+ */
+public class AllocateFileService extends ShutdownAbleThread {
+    private final FileType                    fileType;
+    private final String                      storePath;
+    private final StoreOptions                storeOptions;
+    private final LogStoreFactory             logStoreFactory;
+    // Pre-allocated files
+    private final ArrayDeque<AllocatedResult> blankFiles       = new ArrayDeque<>();
+    // Abstract file sequence.
+    private final AtomicLong                  nextFileSequence = new AtomicLong(0);
+    private final Lock                        allocateLock     = new ReentrantLock();
+    private final Condition                   fullCond         = this.allocateLock.newCondition();
+    private final Condition                   emptyCond        = this.allocateLock.newCondition();
+
+    public AllocateFileService(final AbstractDB abstractDB, final LogStoreFactory logStoreFactory) {
+        this.fileType = abstractDB.getDBFileType();
+        this.storePath = abstractDB.getStorePath();
+        this.storeOptions = logStoreFactory.getStoreOptions();
+        this.logStoreFactory = logStoreFactory;
+    }
+
+    @OnlyForTest
+    public AllocateFileService(final FileType fileType, final String storePath, final LogStoreFactory logStoreFactory) {
+        this.fileType = fileType;
+        this.storePath = storePath;
+        this.logStoreFactory = logStoreFactory;
+        this.storeOptions = logStoreFactory.getStoreOptions();
+    }
+
+    public static class AllocatedResult {
+        AbstractFile abstractFile;
+
+        public AllocatedResult(final AbstractFile abstractFile) {
+            super();
+            this.abstractFile = abstractFile;
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (!isStopped()) {
+                doAllocateFileInLock();
+            }
+        } catch (final InterruptedException ignored) {
+        }
+        onShutdown();
+    }
+
+    @Override
+    public void onShutdown() {
+        // Destroy all empty file
+        for (final AllocatedResult result : this.blankFiles) {
+            if (result.abstractFile != null) {
+                result.abstractFile.shutdown(5000, false);
+            }
+        }
+    }
+
+    private AbstractFile allocateNewAbstractFile() {
+        final String newFilePath = getNewFilePath();
+        final AbstractFile file = this.logStoreFactory.newFile(this.fileType, newFilePath);
+        if (this.storeOptions.isEnableWarmUpFile()) {
+            file.warmupFile();
+        }
+        return file;
+    }
+
+    private void doAllocateFile0() {
+        final AbstractFile abstractFile = allocateNewAbstractFile();
+        this.blankFiles.add(new AllocatedResult(abstractFile));
+    }
+
+    private void doAllocateFileInLock() throws InterruptedException {
+        this.allocateLock.lock();
+        try {
+            while (this.blankFiles.size() >= this.storeOptions.getPreAllocateFileCount()) {
+                this.fullCond.await();
+            }
+            doAllocateFile0();
+            this.emptyCond.signal();
+        } finally {
+            this.allocateLock.unlock();
+        }
+    }
+
+    public AbstractFile takeEmptyFile() throws Exception {
+        this.allocateLock.lock();
+        try {
+            while (this.blankFiles.isEmpty()) {
+                this.emptyCond.await();
+            }
+            final AllocatedResult result = this.blankFiles.pollFirst();
+            this.fullCond.signal();
+            return result.abstractFile;
+        } finally {
+            this.allocateLock.unlock();
+        }
+    }
+
+    public int getAllocatedFileCount() {
+        return this.blankFiles.size();
+    }
+
+    private String getNewFilePath() {
+        return this.storePath + File.separator + String.format("%019d", this.nextFileSequence.getAndIncrement())
+               + this.fileType.getFileSuffix();
+    }
+
+    public void setNextFileSequence(final long sequence) {
+        this.nextFileSequence.set(sequence);
+    }
+
+    public void addBlankAbstractFiles(final List<AbstractFile> blankFiles) {
+        for (final AbstractFile blankFile : blankFiles) {
+            this.blankFiles.add(new AllocatedResult(blankFile));
+        }
+    }
+
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/ServiceManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/ServiceManager.java
new file mode 100644
index 0000000000..412992edb0
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/ServiceManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.Lifecycle;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.util.concurrent.ShutdownAbleThread;
+
+/**
+ * Manager of service like allocateService
+ */
+public class ServiceManager implements Lifecycle<LogStoreFactory> {
+    private static final IgniteLogger LOG = Loggers.forClass(ServiceManager.class);
+
+    private final AbstractDB         abstractDB;
+    private AllocateFileService      allocateService;
+    // Maybe we will add more services in the future
+    private List<ShutdownAbleThread> serviceList;
+    private final AtomicBoolean      start = new AtomicBoolean(false);
+
+    public ServiceManager(final AbstractDB abstractDB) {
+        this.abstractDB = abstractDB;
+    }
+
+    @Override
+    public boolean init(final LogStoreFactory logStoreFactory) {
+        this.allocateService = logStoreFactory.newAllocateService(this.abstractDB);
+        this.serviceList = new ArrayList<>(1);
+        this.serviceList.add(allocateService);
+        return true;
+    }
+
+    public void start() {
+        if (!this.start.compareAndSet(false, true)) {
+            return;
+        }
+        for (final ShutdownAbleThread serviceThread : this.serviceList) {
+            serviceThread.start();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (!this.start.compareAndSet(true, false)) {
+            return;
+        }
+        try {
+            this.allocateService.shutdown(true);
+        } catch (final Exception e) {
+            LOG.error("Error on shutdown {}'s serviceManager,", this.abstractDB.getDBName(), e);
+        }
+    }
+
+    public AllocateFileService getAllocateService() {
+        return this.allocateService;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/Pair.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/Pair.java
new file mode 100644
index 0000000000..28d85b1ddd
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/Pair.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.util;
+
+import java.io.Serializable;
+
+public class Pair<K, V> implements Serializable {
+    private final K key;
+    private final V value;
+
+    public Pair(final K key, final V value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public static <K, V> Pair<K, V> of(final K key, final V value) {
+        return new Pair<K, V>(key, value);
+    }
+
+    public K getFirst() {
+        return key;
+    }
+
+    public V getSecond() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        return "Pair{" + "key=" + key + ", value=" + value + '}';
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ReferenceResource.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ReferenceResource.java
new file mode 100644
index 0000000000..39595adca6
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ReferenceResource.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.util.concurrent;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Reference counting base class, similar to the C++ smart pointer implementation
+ * Forked from <a href="https://github.com/apache/rocketmq">rocketmq</a>.
+ */
+public abstract class ReferenceResource {
+    protected final AtomicLong refCount               = new AtomicLong(1);
+    protected volatile boolean available              = true;
+    protected volatile boolean cleanupOver            = false;
+    private volatile long      firstShutdownTimestamp = 0;
+
+    /**
+     * Whether the resource can hold
+     */
+    public synchronized boolean hold() {
+        if (this.isAvailable()) {
+            if (this.refCount.getAndIncrement() > 0) {
+                return true;
+            } else {
+                this.refCount.getAndDecrement();
+            }
+        }
+        return false;
+    }
+
+    public boolean isAvailable() {
+        return this.available;
+    }
+
+    public void shutdown(final long intervalForcibly) {
+        if (this.available) {
+            this.available = false;
+            this.firstShutdownTimestamp = System.currentTimeMillis();
+            this.release();
+        } else if (this.getRefCount() > 0) {
+            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
+                this.refCount.set(-1000 - this.getRefCount());
+                this.release();
+            }
+        }
+    }
+
+    /**
+     * Release resource
+     */
+    public void release() {
+        long value = this.refCount.decrementAndGet();
+        if (value > 0) {
+            return;
+        }
+        synchronized (this) {
+            this.cleanupOver = this.cleanup(value);
+        }
+    }
+
+    public long getRefCount() {
+        return this.refCount.get();
+    }
+
+    public boolean isCleanupOver() {
+        return this.refCount.get() <= 0 && this.cleanupOver;
+    }
+
+    public abstract boolean cleanup(final long currentRef);
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ShutdownAbleThread.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ShutdownAbleThread.java
new file mode 100644
index 0000000000..ac04aaebfc
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ShutdownAbleThread.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.util.concurrent;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+public abstract class ShutdownAbleThread implements Runnable {
+    private static final IgniteLogger LOG = Loggers.forClass(ShutdownAbleThread.class);
+
+    private static final long   JOIN_TIME = 90 * 1000;
+
+    private Thread              thread;
+    protected volatile boolean  stopped   = false;
+    private final AtomicBoolean started   = new AtomicBoolean(false);
+
+    public ShutdownAbleThread() {
+    }
+
+    public abstract void onShutdown();
+
+    public String getServiceName() {
+        return getClass().getSimpleName();
+    }
+
+    public void start() {
+        LOG.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), this.started.get(),
+            this.thread);
+        if (!this.started.compareAndSet(false, true)) {
+            return;
+        }
+        this.stopped = false;
+        this.thread = new Thread(this, getServiceName());
+        this.thread.start();
+    }
+
+    public void shutdown() {
+        this.shutdown(false);
+    }
+
+    public void shutdown(final boolean interrupt) {
+        LOG.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), this.started.get(),
+            this.thread);
+        if (!this.started.compareAndSet(true, false)) {
+            return;
+        }
+        this.stopped = true;
+        try {
+            if (interrupt) {
+                this.thread.interrupt();
+            }
+            this.thread.join(JOIN_TIME);
+        } catch (final InterruptedException e) {
+            LOG.error("Error when shutdown thread, serviceName:{}", getServiceName(), e);
+        }
+    }
+
+    protected void waitForRunning(long interval) throws InterruptedException {
+        Thread.sleep(interval);
+    }
+
+    public boolean isStopped() {
+        return this.stopped;
+    }
+
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java
index 42884f6b59..cf9bdfe8f9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.raft.jraft.RaftMessagesFactory;
 import org.apache.ignite.raft.jraft.entity.LocalFileMetaBuilder;
+import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter.FileSource;
 import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
 import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
 import org.apache.ignite.raft.jraft.error.RaftError;
@@ -108,7 +109,9 @@ public class LocalSnapshotWriter extends SnapshotWriter {
     public boolean addFile(final String fileName, final Message fileMeta) {
         final LocalFileMetaBuilder metaBuilder = msgFactory.localFileMeta();
         if (fileMeta != null) {
-            metaBuilder.source(((LocalFileMeta)fileMeta).source());
+            FileSource source = ((LocalFileMeta)fileMeta).source();
+
+            metaBuilder.sourceNumber(source == null ? 0 : source.getNumber());
             metaBuilder.checksum(((LocalFileMeta)fileMeta).checksum());
         }
         final LocalFileMeta meta = metaBuilder.build();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java
index 2316bf5a34..01e74095f2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java
@@ -31,6 +31,8 @@ public class Platform {
 
     private static final boolean IS_MAC = isMac0();
 
+    private static final boolean IS_LINUX = isLinux0();
+
     /**
      * Return {@code true} if the JVM is running on Windows
      */
@@ -45,6 +47,13 @@ public class Platform {
         return IS_MAC;
     }
 
+    /**
+     * Return {@code true} if the JVM is running on Linux OSX
+     */
+    public static boolean isLinux() {
+        return IS_LINUX;
+    }
+
     private static boolean isMac0() {
         final boolean mac = SystemPropertyUtil.get("os.name", "") //
             .toLowerCase(Locale.US) //
@@ -64,4 +73,14 @@ public class Platform {
         }
         return windows;
     }
+
+    private static boolean isLinux0() {
+        final boolean linux = SystemPropertyUtil.get("os.name", "") //
+            .toLowerCase(Locale.US) //
+            .contains("linux");
+        if (linux) {
+            LOG.debug("Platform: Linux");
+        }
+        return linux;
+    }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java
index 6b0d4d6907..85bd719d13 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.ignite.raft.jraft.storage.impl;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -36,12 +42,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
 public abstract class BaseLogStorageTest extends BaseStorageTest {
     protected LogStorage logStorage;
     private ConfigurationManager confManager;
@@ -90,15 +90,15 @@ public abstract class BaseLogStorageTest extends BaseStorageTest {
         assertEquals(entry1, this.logStorage.getEntry(100));
         assertEquals(1, this.logStorage.getTerm(100));
 
-        final LogEntry entry2 = TestUtils.mockEntry(200, 2);
+        final LogEntry entry2 = TestUtils.mockEntry(101, 2);
         assertTrue(this.logStorage.appendEntry(entry2));
 
         assertEquals(100, this.logStorage.getFirstLogIndex());
-        assertEquals(200, this.logStorage.getLastLogIndex());
+        assertEquals(101, this.logStorage.getLastLogIndex());
         assertEquals(entry1, this.logStorage.getEntry(100));
-        assertEquals(entry2, this.logStorage.getEntry(200));
+        assertEquals(entry2, this.logStorage.getEntry(101));
         assertEquals(1, this.logStorage.getTerm(100));
-        assertEquals(2, this.logStorage.getTerm(200));
+        assertEquals(2, this.logStorage.getTerm(101));
     }
 
     @Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java
index 06064fc527..405426d896 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java
@@ -16,16 +16,16 @@
  */
 package org.apache.ignite.raft.jraft.storage.io;
 
-import java.io.File;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter;
-import org.junit.jupiter.api.Test;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.File;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter;
+import org.junit.jupiter.api.Test;
+
 public class MessageFileTest {
     @Test
     public void testSaveLoad() throws Exception {
@@ -37,7 +37,7 @@ public class MessageFileTest {
         LocalFileMetaOutter.LocalFileMeta msg = new RaftMessagesFactory()
             .localFileMeta()
             .checksum("test")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_REFERENCE)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_REFERENCE.getNumber())
             .build();
         assertTrue(file.save(msg, true));
 
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/BaseLogitStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/BaseLogitStorageTest.java
new file mode 100644
index 0000000000..53a1ebb3c3
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/BaseLogitStorageTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import static org.apache.ignite.raft.jraft.test.TestUtils.mockEntry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.BaseStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+
+public class BaseLogitStorageTest extends BaseStorageTest {
+    protected StoreOptions storeOptions = new StoreOptions();
+    protected int                  indexEntrySize;
+    protected int                  headerSize;
+    protected int                  indexFileSize;
+    protected int                  segmentFileSize;
+    protected ConfigurationManager confManager;
+    protected LogEntryCodecFactory logEntryCodecFactory;
+
+    protected LogStoreFactory logStoreFactory;
+
+    protected final byte           segmentIndex = IndexType.IndexSegment.getType();
+
+    public void setup() throws Exception {
+        indexEntrySize = IndexEntry.INDEX_SIZE;
+        headerSize = FileHeader.HEADER_SIZE;
+        indexFileSize = headerSize + 10 * indexEntrySize;
+        this.segmentFileSize = 300;
+
+        storeOptions.setIndexFileSize(indexFileSize);
+        storeOptions.setSegmentFileSize(segmentFileSize);
+        storeOptions.setConfFileSize(segmentFileSize);
+        this.logStoreFactory = new LogStoreFactory(storeOptions, new RaftOptions());
+
+        this.confManager = new ConfigurationManager();
+        this.logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
+    }
+
+    protected byte[] genData(final int index, final int term, int size) {
+        final LogEntry entry = mockEntry(index, term, size - 16);
+        final byte[] data = LogEntryV1CodecFactory.getInstance().encoder().encode(entry);
+        assertEquals(size, data.length);
+        return data;
+    }
+
+    protected LogStorageOptions newLogStorageOptions() {
+        final LogStorageOptions opts = new LogStorageOptions();
+        opts.setConfigurationManager(this.confManager);
+        opts.setLogEntryCodecFactory(this.logEntryCodecFactory);
+        return opts;
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
new file mode 100644
index 0000000000..2fa2fe0590
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import static org.apache.ignite.raft.jraft.entity.PeerId.emptyPeer;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.BaseLogStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.LogitLogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.IndexDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class LogitLogStorageTest extends BaseLogStorageTest {
+    private LogitLogStorageFactory logStorageFactory;
+
+    @BeforeEach
+    @Override
+    public void setup() throws Exception {
+        logStorageFactory = new LogitLogStorageFactory(testStoreOptions());
+        logStorageFactory.start();
+
+        super.setup();
+    }
+
+    private static StoreOptions testStoreOptions() {
+        StoreOptions storeOptions = new StoreOptions();
+
+        storeOptions.setSegmentFileSize(512 * 1024);
+        storeOptions.setConfFileSize(512 * 1024);
+        storeOptions.setEnableWarmUpFile(false);
+
+        return storeOptions;
+    }
+
+    @AfterEach
+    @Override
+    public void teardown() {
+        logStorageFactory.close();
+
+        super.teardown();
+    }
+
+    @Override
+    protected LogStorage newLogStorage() {
+        return logStorageFactory.createLogStorage(this.path.toString(), new RaftOptions());
+    }
+
+    /************************  Test consistency between dbs   ***********************************/
+
+    @Test
+    public void testAlignLogWhenLostIndex() {
+        final List<LogEntry> entries = TestUtils.mockEntries(20);
+        // Set 13 - 16 to be conf entry
+        for (int i = 13; i <= 16; i++) {
+            final LogEntry entry = entries.get(i);
+            entry.setType(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
+            entry.setPeers(List.of(emptyPeer()));
+        }
+        this.logStorage.appendEntries(entries);
+        assertEquals(19, this.logStorage.getLastLogIndex());
+
+        // Truncate index db to the index 12, when logStorage reStart, the missing index will be recovered from log db;
+        final IndexDB indexDB = ((LogitLogStorage) this.logStorage).getIndexDB();
+        indexDB.truncateSuffix(12, 0);
+        this.logStorage.shutdown();
+        this.logStorage.init(newLogStorageOptions());
+
+        assertEquals(19, this.logStorage.getLastLogIndex());
+        for (int i = 0; i <= 19; i++) {
+            final LogEntry entry = this.logStorage.getEntry(i);
+            assertEquals(i, entry.getId().getIndex());
+        }
+    }
+
+    @Test
+    public void testAlignLogWhenMoreIndex() {
+        final List<LogEntry> entries = TestUtils.mockEntries(15);
+        this.logStorage.appendEntries(entries);
+        // Append more index into indexDB
+        final IndexDB indexDB = ((LogitLogStorage) this.logStorage).getIndexDB();
+        long maxFlushPosition = 0;
+        for (int i = 15; i <= 20; i++) {
+            final Pair<Integer, Long> flushPair = indexDB.appendIndexAsync(i, 0, IndexType.IndexSegment);
+            maxFlushPosition = Math.max(maxFlushPosition, flushPair.getSecond());
+        }
+        indexDB.waitForFlush(maxFlushPosition, 100);
+        // Recover
+        this.logStorage.shutdown();
+        this.logStorage.init(newLogStorageOptions());
+
+        // In this case, logitLogStorage will truncate indexdb to the index of 14
+        final IndexDB indexDB1 = ((LogitLogStorage) this.logStorage).getIndexDB();
+        assertEquals(14, indexDB1.getLastLogIndex());
+
+        for (int i = 0; i <= 14; i++) {
+            final LogEntry entry = this.logStorage.getEntry(i);
+            assertEquals(i, entry.getId().getIndex());
+        }
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
new file mode 100644
index 0000000000..f58fa63a79
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.db;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.ConfDB;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class ConfDBTest extends BaseLogitStorageTest {
+    private ConfDB confDB;
+    private String               confStorePath;
+    private LogEntryCodecFactory logEntryCodecFactory;
+    private LogEntryDecoder decoder;
+    private LogEntryEncoder encoder;
+
+    @BeforeEach
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        this.confStorePath = this.path + File.separator + "conf";
+        Files.createDirectories(Path.of(confStorePath));
+        this.logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
+        decoder = this.logEntryCodecFactory.decoder();
+        encoder = this.logEntryCodecFactory.encoder();
+        this.init();
+    }
+
+    public void init() {
+        this.confDB = new ConfDB(this.confStorePath);
+        this.confDB.init(this.logStoreFactory);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        this.confDB.shutdown();
+    }
+
+    @Test
+    public void testAppendConfAndIter() throws Exception {
+        this.confDB.startServiceManager();
+        final LogEntry confEntry1 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
+        confEntry1.setId(new LogId(1, 1));
+        final List<PeerId> conf1Peers = JRaftUtils.getConfiguration("localhost:8081,localhost:8082").listPeers();
+        confEntry1.setPeers(conf1Peers);
+
+        final LogEntry confEntry2 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
+        confEntry2.setId(new LogId(2, 2));
+        final List<PeerId> conf2Peers = JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083")
+            .listPeers();
+        confEntry2.setPeers(conf2Peers);
+        {
+
+            this.confDB.appendLogAsync(1, this.encoder.encode(confEntry1));
+            final Pair<Integer, Long> posPair = this.confDB.appendLogAsync(2, this.encoder.encode(confEntry2));
+            this.confDB.waitForFlush(posPair.getSecond(), 100);
+        }
+        {
+            final AbstractDB.LogEntryIterator iterator = this.confDB.iterator(this.decoder);
+            final LogEntry conf1 = iterator.next();
+            assertEquals(toString(conf1.getPeers()), toString(conf1Peers));
+
+            final LogEntry conf2 = iterator.next();
+            assertEquals(toString(conf2.getPeers()), toString(conf2Peers));
+
+            assertNull(iterator.next());
+        }
+    }
+
+    public String toString(final List<PeerId> peers) {
+        final StringBuilder sb = new StringBuilder();
+        int i = 0;
+        int size = peers.size();
+        for (final PeerId peer : peers) {
+            sb.append(peer);
+            if (i < size - 1) {
+                sb.append(",");
+            }
+            i++;
+        }
+        return sb.toString();
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
new file mode 100644
index 0000000000..9e5930b896
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.db;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.IndexDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.AbortFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class IndexDBTest extends BaseLogitStorageTest {
+    private IndexDB indexDB;
+    private String    indexStorePath;
+    private AbortFile abortFile;
+
+    @BeforeEach
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        this.indexStorePath = this.path + File.separator + "index";
+        this.abortFile = new AbortFile(this.indexStorePath + File.separator + "Abort");
+        Files.createDirectories(Path.of(indexStorePath));
+        this.init();
+    }
+
+    public void init() {
+        this.indexDB = new IndexDB(this.indexStorePath);
+        this.indexDB.init(this.logStoreFactory);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        this.indexDB.shutdown();
+    }
+
+    /**
+     * When call testAppendIndex
+     * The FileManager's file state is :
+     *
+     * fileId   fileFromOffset    firstLogIndex  lastLogIndex  fileLastOffset         wrotePosition
+     * 0        0                 0              9             26 + 100 = 126         26 + 100
+     * 1        26 + 100          10             15            26 + 26 + 160 = 212    26 + 60
+     */
+
+    /**
+     * Test for Service , which enables auto flush
+     */
+    @Test
+    public void testAppendIndex() throws Exception {
+        this.indexDB.startServiceManager();
+        {
+            // Append 10 index to first file , and come to the file end (size:130)
+            for (int i = 0; i < 10; i++) {
+                this.indexDB.appendIndexAsync(i, i, IndexType.IndexSegment);
+            }
+            // Write 5 index to second file , wrotePosition = 30 + 50
+            Pair<Integer, Long> posPair = null;
+            for (int i = 10; i <= 15; i++) {
+                posPair = this.indexDB.appendIndexAsync(i, i, IndexType.IndexSegment);
+            }
+
+            this.indexDB.waitForFlush(posPair.getSecond(), 100);
+
+            assertEquals(5, this.indexDB.lookupIndex(15).getOffset());
+            assertEquals(212, this.indexDB.getFlushedPosition());
+        }
+    }
+
+    @Test
+    public void testLookupFirstLogPosFromLogIndex() {
+        this.indexDB.startServiceManager();
+        {
+            this.indexDB.appendIndexAsync(1, 1, IndexType.IndexSegment);
+            this.indexDB.appendIndexAsync(2, 2, IndexType.IndexSegment);
+            final Pair<Integer, Long> posPair = this.indexDB.appendIndexAsync(3, 3, IndexType.IndexConf);
+            this.indexDB.waitForFlush(posPair.getSecond(), 100);
+        }
+
+        final Pair<Integer, Integer> posPair = this.indexDB.lookupFirstLogPosFromLogIndex(1);
+        final int firstSegmentPos = posPair.getFirst();
+        final int firstConfPos = posPair.getSecond();
+        assertEquals(1, firstSegmentPos);
+        assertEquals(3, firstConfPos);
+    }
+
+    @Test
+    public void testLookupLastLogIndexAndPosFromTail() {
+        this.indexDB.startServiceManager();
+        {
+            this.indexDB.appendIndexAsync(1, 1, IndexType.IndexSegment);
+            this.indexDB.appendIndexAsync(2, 2, IndexType.IndexSegment);
+            final Pair<Integer, Long> posPair = this.indexDB.appendIndexAsync(3, 3, IndexType.IndexConf);
+            this.indexDB.appendIndexAsync(4, 4, IndexType.IndexSegment);
+            this.indexDB.waitForFlush(posPair.getSecond(), 100);
+        }
+        final Pair<IndexFile.IndexEntry, IndexFile.IndexEntry> indexPair = this.indexDB
+            .lookupLastLogIndexAndPosFromTail();
+        final IndexFile.IndexEntry lastSegmentIndex = indexPair.getFirst();
+        final IndexFile.IndexEntry lastConfIndex = indexPair.getSecond();
+        assert (lastSegmentIndex.getLogIndex() == 4);
+        assert (lastConfIndex.getLogIndex() == 3);
+    }
+
+    @Test
+    public void testRecoverNormal() throws Exception {
+        this.testAppendIndex();
+        {
+            // Try to shutdown and recover , check flush position
+            this.indexDB.shutdown();
+            this.init();
+            this.indexDB.recover();
+            assertEquals(212, this.indexDB.getFlushedPosition());
+        }
+    }
+
+    @Test
+    public void testRecoverAbNormal() throws Exception {
+        this.testAppendIndex();
+        {
+            // Try to shutdown and recover , check flush position
+            this.indexDB.shutdown();
+            this.init();
+            // Create abort file
+            this.abortFile.create();
+            this.indexDB.recover();
+            assertEquals(212, this.indexDB.getFlushedPosition());
+        }
+    }
+
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
new file mode 100644
index 0000000000..13542f0bca
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.db;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.SegmentLogDB;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SegmentLogDBTest extends BaseLogitStorageTest {
+    private SegmentLogDB segmentLogDB;
+    private String       segmentStorePath;
+
+    @BeforeEach
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        this.segmentStorePath = this.path + File.separator + "segment";
+        Files.createDirectories(Path.of(segmentStorePath));
+        this.init();
+    }
+
+    public void init() {
+        this.segmentLogDB = new SegmentLogDB(this.segmentStorePath);
+        this.segmentLogDB.init(this.logStoreFactory);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        this.segmentLogDB.shutdown();
+    }
+
+    @Test
+    public void testAppendLog() throws Exception {
+        this.segmentLogDB.startServiceManager();
+        // The default file size is 300
+        // One entry size = 24 + 6 = 30, so this case will create three log file
+        Pair<Integer, Long> posPair = null;
+        for (int i = 0; i < 20; i++) {
+            final byte[] data = genData(i, 0, 24);
+            posPair = this.segmentLogDB.appendLogAsync(i, data);
+        }
+        this.segmentLogDB.waitForFlush(posPair.getSecond(), 100);
+        assertEquals(0, this.segmentLogDB.getFirstLogIndex());
+        assertEquals(19, this.segmentLogDB.getLastLogIndex());
+        assertEquals((600 + 26 + 2 * 30), this.segmentLogDB.getFlushedPosition());
+    }
+
+    @Test
+    public void testIterator() throws Exception {
+        testAppendLog();
+        // Read from the fifth entry, pos = 26 + 30 * 4 = 146
+        final AbstractDB.LogEntryIterator iterator = this.segmentLogDB.iterator(LogEntryV1CodecFactory.getInstance()
+            .decoder(), 5, 146);
+        LogEntry entry;
+        int index = 4;
+        while ((entry = iterator.next()) != null) {
+            assertEquals(index, entry.getId().getIndex());
+            index++;
+        }
+    }
+
+    @Test
+    public void testRecover() throws Exception {
+        this.segmentLogDB.startServiceManager();
+        final byte[] data = genData(1, 0, 150);
+        final byte[] data2 = genData(2, 0, 100);
+        final byte[] data3 = genData(3, 0, 100);
+        {
+            // Write first file , one segment file size = 300
+            this.segmentLogDB.appendLogAsync(1, data);
+            this.segmentLogDB.appendLogAsync(2, data2);
+            // Write second file
+            final Pair<Integer, Long> posPair = this.segmentLogDB.appendLogAsync(3, data3);
+            this.segmentLogDB.waitForFlush(posPair.getSecond(), 100);
+        }
+
+        final byte[] log = this.segmentLogDB.lookupLog(3, this.headerSize);
+        assertArrayEquals(data3, log);
+        {
+            this.segmentLogDB.shutdown();
+            this.init();
+            this.segmentLogDB.recover();
+            // Last flush position = one segment file size (300) + header(26) + log3Size(2 + 4 + 100) = 432
+            assertEquals(432, this.segmentLogDB.getFlushedPosition());
+        }
+        {
+            final byte[] log1 = this.segmentLogDB.lookupLog(1, this.headerSize);
+            assertArrayEquals(data, log1);
+            final byte[] log3 = this.segmentLogDB.lookupLog(3, this.headerSize);
+            assertArrayEquals(data3, log3);
+        }
+    }
+
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/FileManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/FileManagerTest.java
new file mode 100644
index 0000000000..b3de55fcf9
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/FileManagerTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.file;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileManager;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.AllocateFileService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Use indexFile to test fileManager
+ */
+public class FileManagerTest extends BaseLogitStorageTest {
+    private FileManager fileManager;
+    private AllocateFileService allocateService;
+    private String              indexStorePath;
+
+    @BeforeEach
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        this.indexStorePath = this.path + File.separator + "index";
+        Files.createDirectories(Path.of(indexStorePath));
+        this.allocateService = new AllocateFileService(FileType.FILE_INDEX, indexStorePath, this.logStoreFactory);
+        this.allocateService.start();
+        this.fileManager = this.logStoreFactory.newFileManager(FileType.FILE_INDEX, indexStorePath,
+            this.allocateService);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        this.fileManager.shutdown();
+        this.allocateService.shutdown(true);
+    }
+
+    /**
+     * When call writeDataToFirstFile and writeDataToSecondFile
+     * The fileManager's file state is :
+     *
+     * fileId   fileFromOffset    firstLogIndex  lastLogIndex  fileLastOffset         wrotePosition
+     * 0        0                 0              9             26 + 100 =216          26 + 100
+     * 1        26 + 100          10             15            26 + 26 + 160 = 212    26 + 60
+     */
+    @Test
+    public void writeDataToFirstFile() {
+        // Append 10 index to first file , and come to the file end (size:130)
+        {
+            for (int i = 0; i < 10; i++) {
+                final AbstractFile lastFile = this.fileManager.getLastFile(i, 10, true);
+                assert (lastFile instanceof IndexFile);
+                final IndexFile indexFile = (IndexFile) lastFile;
+                indexFile.appendIndex(i, i, segmentIndex);
+            }
+        }
+    }
+
+    @Test
+    public void writeDataToSecondFile() {
+        writeDataToFirstFile();
+
+        // Try get last file again , this file is a new blank file (from allocator)
+        final AbstractFile lastFile = this.fileManager.getLastFile(10, 10, true);
+        assertEquals(this.indexFileSize, lastFile.getFileFromOffset());
+
+        // Write 5 index to second file , wrotePosition = 30 + 50
+        final IndexFile indexFile = (IndexFile) lastFile;
+        for (int i = 10; i <= 15; i++) {
+            indexFile.appendIndex(i, i, segmentIndex);
+        }
+    }
+
+    @Test
+    public void testFindAbstractFileByOffset() {
+        writeDataToSecondFile();
+        {
+            // Test find first file by offset 0
+            final AbstractFile firstFile = this.fileManager.findFileByOffset(30, false);
+            assertEquals(0, firstFile.getFileFromOffset());
+            // Test find second file by offset 136
+            final AbstractFile secondFile = this.fileManager.findFileByOffset(this.indexFileSize + 10, false);
+            assertEquals(this.indexFileSize, secondFile.getFileFromOffset());
+        }
+    }
+
+    @Test
+    public void testFlush() {
+        writeDataToSecondFile();
+
+        {
+            // First time  flush , flush position = indexFileSize (126)
+            this.fileManager.flush();
+            assertEquals(126, this.fileManager.getFlushedPosition());
+        }
+
+        {
+            // Second time  flush , flush position = 212
+            this.fileManager.flush();
+            assertEquals(212, this.fileManager.getFlushedPosition());
+        }
+    }
+
+    @Test
+    public void testTruncateSuffix() {
+        // Current flush position = 212
+        testFlush();
+        // Test truncate to logIndex = 10 , that means update flush position to 126 + 26 + 10 = 162
+        this.fileManager.truncateSuffix(10, 0);
+        assertEquals(162, this.fileManager.getFlushedPosition());
+        assertEquals(10, this.fileManager.getLastLogIndex());
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
new file mode 100644
index 0000000000..0821aa78ea
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.file.index;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.channels.FileChannel;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class IndexFileTest extends BaseLogitStorageTest {
+
+    private static final int  FILE_SIZE   = 10 * 10 + FileHeader.HEADER_SIZE;
+    private IndexFile offsetIndex;
+
+    @BeforeEach
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        this.init();
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        this.offsetIndex.shutdown(1000);
+    }
+
+    private void init() {
+        final String filePath = this.path + File.separator + "IndexFileTest";
+        this.offsetIndex = new IndexFile(filePath, FILE_SIZE);
+    }
+
+    private final IndexEntry appendEntry0 = new IndexEntry(0, 1);
+    private final IndexEntry appendEntry1 = new IndexEntry(1, 2);
+    private final IndexEntry appendEntry2 = new IndexEntry(2, 3);
+
+    @Test
+    public void testAppendIndex() {
+        this.offsetIndex.appendIndex(appendEntry0.getOffset(), appendEntry0.getPosition(), segmentIndex);
+        this.offsetIndex.appendIndex(appendEntry1.getOffset(), appendEntry1.getPosition(), segmentIndex);
+        this.offsetIndex.appendIndex(appendEntry2.getOffset(), appendEntry2.getPosition(), segmentIndex);
+        this.offsetIndex.flush();
+        assertEquals(this.offsetIndex.getLastLogIndex(), appendEntry2.getOffset());
+    }
+
+    @Test
+    public void testLooUp() {
+        testAppendIndex();
+
+        final IndexEntry entry0 = this.offsetIndex.lookupIndex(appendEntry0.getOffset());
+        assertEquals(appendEntry0.getOffset(), entry0.getOffset());
+
+        final IndexEntry entry1 = this.offsetIndex.lookupIndex(appendEntry1.getOffset());
+        assertEquals(appendEntry1.getOffset(), entry1.getOffset());
+
+        final IndexEntry entry2 = this.offsetIndex.lookupIndex(appendEntry2.getOffset());
+        assertEquals(appendEntry2.getOffset(), entry2.getOffset());
+    }
+
+    @Test
+    public void testTruncate() {
+        // Append 10 index entry
+        for (int idx = 1; idx <= 10; idx++) {
+            this.offsetIndex.appendIndex(idx, idx, segmentIndex);
+        }
+
+        // Check truncate to 9
+        {
+            this.offsetIndex.truncate(9, 0);
+            assertEquals(8, this.offsetIndex.getLastLogIndex());
+        }
+
+        // Check truncate to 5
+        {
+            this.offsetIndex.truncate(5, 0);
+            // Test recover
+            this.offsetIndex.shutdown(1000);
+            this.init();
+            this.offsetIndex.recover();
+            assertEquals(4, this.offsetIndex.getLastLogIndex());
+        }
+    }
+
+    @Test
+    public void testRecoverFromInvalidData() throws Exception {
+        testAppendIndex();
+        // Reopen
+        {
+            this.offsetIndex.shutdown(1000);
+            this.init();
+            this.offsetIndex.recover();
+            assertEquals(this.headerSize + 30, this.offsetIndex.getWrotePosition());
+            assertEquals(2, this.offsetIndex.getLastLogIndex());
+
+            // Test lookup,all data is valid.
+            final IndexEntry entry1 = this.offsetIndex.lookupIndex(appendEntry1.getOffset());
+            assertEquals(appendEntry1.getOffset(), entry1.getOffset());
+        }
+        {
+            this.offsetIndex.shutdown(1000);
+            // Cleared data after pos= 46, the third data will be truncated when recovering.
+            try (FileOutputStream out = new FileOutputStream(new File(this.offsetIndex.getFilePath()), true);
+                    FileChannel outChan = out.getChannel()) {
+                outChan.truncate(this.headerSize + 20);
+                out.flush();
+            }
+            this.init();
+            this.offsetIndex.recover();
+            assertEquals(this.headerSize + 20, this.offsetIndex.getWrotePosition());
+            assertEquals(1, this.offsetIndex.getLastLogIndex());
+        }
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
new file mode 100644
index 0000000000..4463927239
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.file.segment;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.segment.SegmentFile;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SegmentFileTest extends BaseLogitStorageTest {
+
+    private static final int FILE_SIZE = 64 + FileHeader.HEADER_SIZE;
+    private SegmentFile segmentFile;
+
+    @BeforeEach
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        this.init();
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+        this.segmentFile.shutdown(1000);
+    }
+
+    private void init() {
+        final String filePath = this.path + File.separator + "IndexFileTest";
+        this.segmentFile = new SegmentFile(filePath, FILE_SIZE);
+    }
+
+    @Test
+    public void testAppendDataAndRead() {
+        {
+            // Write 32 bytes data
+            final byte[] data = genData(0, 0, 32);
+            int firstWritePos = FileHeader.HEADER_SIZE;
+            assertFalse(this.segmentFile.reachesFileEndBy(SegmentFile.getWriteBytes(data)));
+            assertEquals(firstWritePos, this.segmentFile.appendData(0, data));
+            // Can't read before sync
+            this.segmentFile.flush();
+            assertArrayEquals(data, this.segmentFile.lookupData(0, firstWritePos));
+        }
+
+        {
+            // Write 20 bytes data, length = 6 + 14 = 20
+            final byte[] data2 = genData(1, 0, 20);
+            int nextWrotePos = FileHeader.HEADER_SIZE + 38;
+            assertFalse(this.segmentFile.reachesFileEndBy(SegmentFile.getWriteBytes(data2)));
+            assertEquals(nextWrotePos, this.segmentFile.appendData(1, data2));
+            // Can't read before sync
+            this.segmentFile.flush();
+            assertArrayEquals(data2, this.segmentFile.lookupData(1, nextWrotePos));
+        }
+    }
+
+    @Test
+    public void testRecoverFromInvalidData() throws IOException {
+        testAppendDataAndRead();
+        int firstWritePos = FileHeader.HEADER_SIZE;
+        {
+            // Restart segment file, all data is valid.
+            this.segmentFile.shutdown(1000);
+            this.init();
+            this.segmentFile.recover();
+            assertEquals(32, this.segmentFile.lookupData(0, firstWritePos).length);
+            assertEquals(20, this.segmentFile.lookupData(1, 38 + firstWritePos).length);
+        }
+
+        {
+            this.segmentFile.shutdown(1000);
+            try (FileOutputStream out = new FileOutputStream(new File(this.segmentFile.getFilePath()), true);
+                    FileChannel outChan = out.getChannel()) {
+                // Cleared data after pos=64, the second data will be truncated when recovering.
+                outChan.truncate(64);
+            }
+            this.init();
+            this.segmentFile.recover();
+            // First data is still valid
+            assertEquals(32, this.segmentFile.lookupData(0, firstWritePos).length);
+            // The second data is truncated.
+            assertNull(this.segmentFile.lookupData(1, 38 + firstWritePos));
+        }
+    }
+
+    @Test
+    public void testTruncate() {
+        testAppendDataAndRead();
+        int truncatePos = FileHeader.HEADER_SIZE + 38;
+        this.segmentFile.truncate(1, truncatePos);
+
+        // Recover
+        this.segmentFile.shutdown(1000);
+        this.init();
+        this.segmentFile.recover();
+
+        assertEquals(0, this.segmentFile.getLastLogIndex());
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java
index 4753c1947f..3dbc1d752f 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java
@@ -52,7 +52,7 @@ public class LocalSnapshotMetaTableTest {
     public void testAddRemove() {
         LocalFileMetaOutter.LocalFileMeta meta = msgFactory.localFileMeta()
             .checksum("test")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
             .build();
         assertEquals(0, table.listFiles().size());
         assertTrue(this.table.addFile("data", meta));
@@ -70,12 +70,12 @@ public class LocalSnapshotMetaTableTest {
     public void testSaveLoadFile(@WorkDirectory Path workDir) throws IOException {
         LocalFileMetaOutter.LocalFileMeta meta1 = msgFactory.localFileMeta()
             .checksum("data1")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
             .build();
         assertTrue(this.table.addFile("data1", meta1));
         LocalFileMetaOutter.LocalFileMeta meta2 = msgFactory.localFileMeta()
             .checksum("data2")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
             .build();
         assertTrue(this.table.addFile("data2", meta2));
 
@@ -105,12 +105,12 @@ public class LocalSnapshotMetaTableTest {
     public void testSaveLoadIoBuffer() throws Exception {
         LocalFileMetaOutter.LocalFileMeta meta1 = msgFactory.localFileMeta()
             .checksum("data1")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
             .build();
         assertTrue(this.table.addFile("data1", meta1));
         LocalFileMetaOutter.LocalFileMeta meta2 = msgFactory.localFileMeta()
             .checksum("data2")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
             .build();
         assertTrue(this.table.addFile("data2", meta2));
 
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java
index 10b9a4cc89..0e1e3f4dc7 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java
@@ -66,14 +66,14 @@ public class LocalSnapshotWriterTest extends BaseStorageTest {
         LocalFileMetaOutter.LocalFileMeta meta = opts.getRaftMessagesFactory()
             .localFileMeta()
             .checksum("test")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
             .build();
         assertTrue(this.writer.addFile("data1", meta));
         assertTrue(this.writer.addFile("data2"));
 
         assertEquals(meta, this.writer.getFileMeta("data1"));
         assertNull(((LocalFileMetaOutter.LocalFileMeta) this.writer.getFileMeta("data2")).checksum());
-        assertFalse(((LocalFileMetaOutter.LocalFileMeta) this.writer.getFileMeta("data2")).hasUserMeta());
+        assertNull(((LocalFileMetaOutter.LocalFileMeta) this.writer.getFileMeta("data2")).userMeta());
 
         this.writer.sync();
         //create a new writer
@@ -82,7 +82,7 @@ public class LocalSnapshotWriterTest extends BaseStorageTest {
         assertNotSame(writer, newWriter);
         assertEquals(meta, newWriter.getFileMeta("data1"));
         assertNull(((LocalFileMetaOutter.LocalFileMeta) newWriter.getFileMeta("data2")).checksum());
-        assertFalse(((LocalFileMetaOutter.LocalFileMeta) newWriter.getFileMeta("data2")).hasUserMeta());
+        assertNull(((LocalFileMetaOutter.LocalFileMeta) newWriter.getFileMeta("data2")).userMeta());
     }
 
     @Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java
index 705c75fa91..24cd8506af 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java
@@ -60,7 +60,7 @@ public class SnapshotFileReaderTest extends BaseStorageTest {
         final LocalFileMetaOutter.LocalFileMeta meta = opts.getRaftMessagesFactory()
             .localFileMeta()
             .checksum("test")
-            .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+            .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
             .build();
         this.metaTable.addFile("data", meta);
         return meta;