You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2023/09/25 14:31:13 UTC
[ignite-3] branch main updated: IGNITE-19806 Port new log storage (PR #696) from "sofa-jraft" (#2245)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f2fc9528fb IGNITE-19806 Port new log storage (PR #696) from "sofa-jraft" (#2245)
f2fc9528fb is described below
commit f2fc9528fb3e00a5ee67845c8927b8b9172a3748
Author: Ivan Bessonov <be...@gmail.com>
AuthorDate: Mon Sep 25 17:31:07 2023 +0300
IGNITE-19806 Port new log storage (PR #696) from "sofa-jraft" (#2245)
---
build.gradle | 13 +
gradle/libs.versions.toml | 4 +
.../apache/ignite/internal/util/IgniteUtils.java | 24 +
.../ignite/internal/util/IgniteUtilsTest.java | 15 +
modules/raft/build.gradle | 1 +
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 11 +-
.../raft/jraft/entity/LocalFileMetaOutter.java | 31 +-
.../ignite/raft/jraft/storage/logit/LibC.java | 59 +++
.../storage/logit/LogitLogStorageFactory.java | 70 +++
.../jraft/storage/logit/option/StoreOptions.java | 120 +++++
.../raft/jraft/storage/logit/package-info.java | 22 +
.../storage/logit/storage/LogitLogStorage.java | 554 ++++++++++++++++++++
.../jraft/storage/logit/storage/db/AbstractDB.java | 415 +++++++++++++++
.../jraft/storage/logit/storage/db/ConfDB.java | 40 ++
.../jraft/storage/logit/storage/db/IndexDB.java | 147 ++++++
.../storage/logit/storage/db/SegmentLogDB.java | 40 ++
.../logit/storage/factory/LogStoreFactory.java | 100 ++++
.../storage/logit/storage/file/AbstractFile.java | 523 +++++++++++++++++++
.../storage/logit/storage/file/FileHeader.java | 112 ++++
.../storage/logit/storage/file/FileManager.java | 577 +++++++++++++++++++++
.../jraft/storage/logit/storage/file/FileType.java | 43 ++
.../logit/storage/file/assit/AbortFile.java | 73 +++
.../logit/storage/file/assit/Checkpoint.java | 80 +++
.../file/assit/FirstLogIndexCheckpoint.java | 68 +++
.../storage/file/assit/FlushStatusCheckpoint.java | 81 +++
.../logit/storage/file/index/IndexFile.java | 280 ++++++++++
.../logit/storage/file/index/IndexType.java | 34 ++
.../logit/storage/file/segment/SegmentFile.java | 209 ++++++++
.../logit/storage/service/AllocateFileService.java | 157 ++++++
.../logit/storage/service/ServiceManager.java | 79 +++
.../ignite/raft/jraft/storage/logit/util/Pair.java | 47 ++
.../logit/util/concurrent/ReferenceResource.java | 84 +++
.../logit/util/concurrent/ShutdownAbleThread.java | 82 +++
.../snapshot/local/LocalSnapshotWriter.java | 5 +-
.../apache/ignite/raft/jraft/util/Platform.java | 19 +
.../jraft/storage/impl/BaseLogStorageTest.java | 20 +-
.../raft/jraft/storage/io/MessageFileTest.java | 12 +-
.../jraft/storage/logit/BaseLogitStorageTest.java | 77 +++
.../jraft/storage/logit/LogitLogStorageTest.java | 126 +++++
.../raft/jraft/storage/logit/db/ConfDBTest.java | 117 +++++
.../raft/jraft/storage/logit/db/IndexDBTest.java | 154 ++++++
.../jraft/storage/logit/db/SegmentLogDBTest.java | 121 +++++
.../jraft/storage/logit/file/FileManagerTest.java | 136 +++++
.../storage/logit/file/index/IndexFileTest.java | 135 +++++
.../logit/file/segment/SegmentFileTest.java | 125 +++++
.../snapshot/local/LocalSnapshotMetaTableTest.java | 10 +-
.../snapshot/local/LocalSnapshotWriterTest.java | 6 +-
.../snapshot/local/SnapshotFileReaderTest.java | 2 +-
48 files changed, 5210 insertions(+), 50 deletions(-)
diff --git a/build.gradle b/build.gradle
index bd9d33f11f..42f58c4e0a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -19,6 +19,7 @@
//This need for resolving plugins in buildscripts
plugins {
id('idea')
+ alias(libs.plugins.ideaext)
alias(libs.plugins.javacc) apply false
alias(libs.plugins.modernizer) apply false
alias(libs.plugins.aggregateJavadoc)
@@ -45,6 +46,10 @@ ext {
]
}
+def compilerArgs = [
+ "--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"
+]
+
allprojects {
group 'org.apache.ignite'
version = "3.0.0-SNAPSHOT"
@@ -89,6 +94,7 @@ allprojects {
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
+ options.compilerArgs += compilerArgs
}
//Temporary hack to disable caching of Test tasks.
@@ -127,3 +133,10 @@ subprojects {
tasks.register('printSubDependencies', DependencyReportTask)
}
+
+idea.project.settings {
+ compiler.javac {
+ // Workaround on https://youtrack.jetbrains.com/issue/IDEA-154038.
+ javacAdditionalOptions = compilerArgs.join(' ')
+ }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 9b8e21c259..320789e57e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -78,6 +78,7 @@ autoService = "1.0.1"
awaitility = "4.2.0"
progressBar = "0.9.4"
guava = "31.1-jre"
+jna = "5.13.0"
#Tools
pmdTool = "6.55.0"
@@ -95,6 +96,7 @@ docker = "com.palantir.docker:0.34.0"
checksum = "org.gradle.crypto.checksum:1.4.0"
setupbuilder = "de.inetsoftware.setupbuilder:7.2.13"
aggregateJavadoc = "io.freefair.aggregate-javadoc:6.5.1"
+ideaext = "org.jetbrains.gradle.plugin.idea-ext:1.1.7"
[libraries]
assertj-core = { module = "org.assertj:assertj-core", version.ref = "assertj" }
@@ -251,3 +253,5 @@ auto-service-annotations = { module = "com.google.auto.service:auto-service-anno
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
progressBar = { module = "me.tongfei:progressbar", version.ref = "progressBar" }
+
+jna = { module = "net.java.dev.jna:jna", version.ref = "jna"}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 1ea9fed961..5800ac4b2c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
+import static java.util.Arrays.copyOfRange;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
@@ -1068,6 +1069,29 @@ public class IgniteUtils {
return result;
}
+ /**
+ * Converts byte buffer into a byte array. The content of the array is all bytes from the position "0" to "capacity". Preserves original
+ * position/limit values in the buffer. Always returns a new instance, instead of accessing the internal buffer's array.
+ */
+ public static byte[] byteBufferToByteArray(ByteBuffer buffer) {
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset();
+
+ return copyOfRange(buffer.array(), offset, offset + buffer.capacity());
+ } else {
+ byte[] array = new byte[buffer.capacity()];
+
+ int originalPosition = buffer.position();
+
+ buffer.position(0);
+ buffer.get(array);
+
+ buffer.position(originalPosition);
+
+ return array;
+ }
+ }
+
/**
* Stops all ignite components.
*
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 7768be4889..b7199bc81d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -23,6 +23,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
import static org.apache.ignite.internal.util.IgniteUtils.copyStateTo;
import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
@@ -30,6 +31,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -40,6 +42,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -200,4 +203,16 @@ class IgniteUtilsTest extends BaseIgniteAbstractTest {
assertThat(result, willThrow(NumberFormatException.class));
}
+
+ @Test
+ void testByteBufferToByteArray() {
+ ByteBuffer heapBuffer = ByteBuffer.wrap(new byte[]{0, 1, 2, 3, 4}, 1, 3).slice();
+ assertArrayEquals(new byte[] {1, 2, 3}, byteBufferToByteArray(heapBuffer));
+
+ ByteBuffer bigDirectBuffer = ByteBuffer.allocateDirect(5);
+ bigDirectBuffer.put(new byte[]{0, 1, 2, 3, 4});
+
+ ByteBuffer smallDirectBuffer = bigDirectBuffer.position(1).limit(4).slice();
+ assertArrayEquals(new byte[] {1, 2, 3}, byteBufferToByteArray(smallDirectBuffer));
+ }
}
diff --git a/modules/raft/build.gradle b/modules/raft/build.gradle
index f37d6dd86d..e871919f8b 100644
--- a/modules/raft/build.gradle
+++ b/modules/raft/build.gradle
@@ -36,6 +36,7 @@ dependencies {
implementation libs.dropwizard.metrics
implementation libs.jctools.core
implementation libs.auto.service.annotations
+ implementation libs.jna
annotationProcessor project(":ignite-network-annotation-processor")
annotationProcessor libs.auto.service
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 61eb954321..66176c3354 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.jraft.core;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.synchronizedList;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.util.IgniteUtils.byteBufferToByteArray;
import static org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
import static org.apache.ignite.raft.jraft.test.TestUtils.sender;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -2732,13 +2733,13 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
UserLog userLog = leader.readCommittedUserLog(1);
assertNotNull(userLog);
assertEquals(2, userLog.getIndex());
- assertEquals("hello0", stringFromBytes(userLog.getData().array()));
+ assertEquals("hello0", stringFromBytes(byteBufferToByteArray(userLog.getData())));
// index == 5 is a DATA log(a user log)
userLog = leader.readCommittedUserLog(5);
assertNotNull(userLog);
assertEquals(5, userLog.getIndex());
- assertEquals("hello3", stringFromBytes(userLog.getData().array()));
+ assertEquals("hello3", stringFromBytes(byteBufferToByteArray(userLog.getData())));
// index == 15 is greater than last_committed_index
try {
@@ -2795,20 +2796,20 @@ public class ItNodeTest extends BaseIgniteAbstractTest {
userLog = leader.readCommittedUserLog(12);
assertNotNull(userLog);
assertEquals(16, userLog.getIndex());
- assertEquals("hello10", stringFromBytes(userLog.getData().array()));
+ assertEquals("hello10", stringFromBytes(byteBufferToByteArray(userLog.getData())));
// now index == 17 is a user log
userLog = leader.readCommittedUserLog(17);
assertNotNull(userLog);
assertEquals(17, userLog.getIndex());
- assertEquals("hello11", stringFromBytes(userLog.getData().array()));
+ assertEquals("hello11", stringFromBytes(byteBufferToByteArray(userLog.getData())));
cluster.ensureSame();
assertEquals(3, cluster.getFsms().size());
for (MockStateMachine fsm : cluster.getFsms()) {
assertEquals(20, fsm.getLogs().size());
for (int i = 0; i < 20; i++)
- assertEquals("hello" + i, stringFromBytes(fsm.getLogs().get(i).array()));
+ assertEquals("hello" + i, stringFromBytes(byteBufferToByteArray(fsm.getLogs().get(i))));
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
index 6a3b09600d..e32a6e54ae 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
@@ -31,39 +31,29 @@ public final class LocalFileMetaOutter {
*/
public enum FileSource {
/**
- * <code>FILE_SOURCE_LOCAL = 0;</code>
+ * <code>FILE_SOURCE_LOCAL = 1;</code>
*/
- FILE_SOURCE_LOCAL(0),
+ FILE_SOURCE_LOCAL(1),
/**
- * <code>FILE_SOURCE_REFERENCE = 1;</code>
+ * <code>FILE_SOURCE_REFERENCE = 2;</code>
*/
- FILE_SOURCE_REFERENCE(1);
+ FILE_SOURCE_REFERENCE(2);
public final int getNumber() {
return value;
}
- /**
- * @deprecated Use {@link #forNumber(int)} instead.
- */
- @Deprecated
- public static FileSource valueOf(int value) {
- return forNumber(value);
- }
-
public static FileSource forNumber(int value) {
switch (value) {
- case 0:
- return FILE_SOURCE_LOCAL;
case 1:
+ return FILE_SOURCE_LOCAL;
+ case 2:
return FILE_SOURCE_REFERENCE;
default:
return null;
}
}
- private static final FileSource[] VALUES = values();
-
private final int value;
private FileSource(int value) {
@@ -73,13 +63,16 @@ public final class LocalFileMetaOutter {
@Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_FILE_META)
public interface LocalFileMeta extends Message {
+ int sourceNumber();
+
@Nullable
- @Marshallable
- FileSource source();
+ default FileSource source() {
+ return FileSource.forNumber(sourceNumber());
+ }
@Nullable
String checksum();
- boolean hasUserMeta();
+ byte @Nullable[] userMeta();
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LibC.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LibC.java
new file mode 100644
index 0000000000..cb61f04d4c
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LibC.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Platform;
+import com.sun.jna.Pointer;
+
+/**
+ * Moved from rocketmq.
+ *
+ * https://raw.githubusercontent.com/apache/rocketmq/master/store/src/main/java/org/apache/rocketmq/store/util/LibC.java
+ */
+public interface LibC extends Library {
+ LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
+
+ int MADV_WILLNEED = 3;
+ int MADV_DONTNEED = 4;
+
+ int MCL_CURRENT = 1;
+ int MCL_FUTURE = 2;
+ int MCL_ONFAULT = 4;
+
+ /* sync memory asynchronously */
+ int MS_ASYNC = 0x0001;
+ /* invalidate mappings & caches */
+ int MS_INVALIDATE = 0x0002;
+ /* synchronous memory sync */
+ int MS_SYNC = 0x0004;
+
+ int mlock(Pointer var1, NativeLong var2);
+
+ int munlock(Pointer var1, NativeLong var2);
+
+ int madvise(Pointer var1, NativeLong var2, int var3);
+
+ Pointer memset(Pointer p, int v, long len);
+
+ int mlockall(int flags);
+
+ int msync(Pointer p, NativeLong length, int flags);
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java
new file mode 100644
index 0000000000..bc66237992
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import java.nio.file.Paths;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.LogitLogStorage;
+import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.StringUtils;
+
+public class LogitLogStorageFactory implements LogStorageFactory {
+ private static final IgniteLogger LOG = Loggers.forClass(LogitLogStorageFactory.class);
+
+ public static final String NEW_STORAGE_PATH = "LogitStorage";
+
+ /** Executor for shared storages. */
+ protected final ExecutorService executorService;
+
+ private final StoreOptions storeOptions;
+
+ public LogitLogStorageFactory(StoreOptions storeOptions) {
+ this.storeOptions = storeOptions;
+ executorService = Executors.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors() * 2,
+ new NamedThreadFactory("raft-shared-log-storage-pool", LOG)
+ );
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public LogStorage createLogStorage(String uri, RaftOptions raftOptions) {
+ Requires.requireTrue(StringUtils.isNotBlank(uri), "Blank log storage uri.");
+
+ String newStoragePath = Paths.get(uri, NEW_STORAGE_PATH).toString();
+ return new LogitLogStorage(newStoragePath, storeOptions, raftOptions, executorService);
+ }
+
+ @Override
+ public void close() {
+ ExecutorServiceHelper.shutdownAndAwaitTermination(executorService);
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
new file mode 100644
index 0000000000..855373a320
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/option/StoreOptions.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.option;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+
+/**
+ * Storage options
+ */
+public class StoreOptions {
+
+ private static final String storagePath = "localLog";
+
+ // Default is 0.5G
+ private int segmentFileSize = 1024 * 1024 * 512;
+
+ private int indexFileSize = FileHeader.HEADER_SIZE + 5000000
+ * IndexEntry.INDEX_SIZE;
+
+ private int confFileSize = 1024 * 1024 * 512;
+
+ // Whether enable warm up file when pre allocate
+ private boolean enableWarmUpFile = true;
+
+ // Pre allocate files
+ private int preAllocateFileCount = 2;
+
+ // How many files can be kept in memory, default = preAllocateFileCount + 3
+ private int keepInMemoryFileCount = 5;
+
+ // The max times for flush
+ private int maxFlushTimes = 200;
+
+ // Checkpoint
+ private int checkpointFlushStatusInterval = 5000;
+
+ public String getStoragePath() {
+ return storagePath;
+ }
+
+ public boolean isEnableWarmUpFile() {
+ return enableWarmUpFile;
+ }
+
+ public void setEnableWarmUpFile(final boolean enableWarmUpFile) {
+ this.enableWarmUpFile = enableWarmUpFile;
+ }
+
+ public int getSegmentFileSize() {
+ return segmentFileSize;
+ }
+
+ public void setSegmentFileSize(final int segmentFileSize) {
+ this.segmentFileSize = segmentFileSize;
+ }
+
+ public int getIndexFileSize() {
+ return indexFileSize;
+ }
+
+ public void setIndexFileSize(final int indexFileSize) {
+ this.indexFileSize = indexFileSize;
+ }
+
+ public int getConfFileSize() {
+ return confFileSize;
+ }
+
+ public void setConfFileSize(final int confFileSize) {
+ this.confFileSize = confFileSize;
+ }
+
+ public int getPreAllocateFileCount() {
+ return preAllocateFileCount;
+ }
+
+ public void setPreAllocateFileCount(final int preAllocateFileCount) {
+ this.preAllocateFileCount = preAllocateFileCount;
+ }
+
+ public int getKeepInMemoryFileCount() {
+ return keepInMemoryFileCount;
+ }
+
+ public void setKeepInMemoryFileCount(final int keepInMemoryFileCount) {
+ this.keepInMemoryFileCount = keepInMemoryFileCount;
+ }
+
+ public int getMaxFlushTimes() {
+ return maxFlushTimes;
+ }
+
+ public void setMaxFlushTimes(final int maxFlushTimes) {
+ this.maxFlushTimes = maxFlushTimes;
+ }
+
+ public int getCheckpointFlushStatusInterval() {
+ return checkpointFlushStatusInterval;
+ }
+
+ public void setCheckpointFlushStatusInterval(final int checkpointFlushStatusInterval) {
+ this.checkpointFlushStatusInterval = checkpointFlushStatusInterval;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/package-info.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/package-info.java
new file mode 100644
index 0000000000..4903e2fdd7
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains classes from {@code jraft-extension/java-log-storage-impl/src/main/java/com/alipay/sofa/jraft/}.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
new file mode 100644
index 0000000000..69df2d73a6
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/LogitLogStorage.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.conf.Configuration;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB.LogEntryIterator;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.ConfDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.IndexDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.SegmentLogDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.FirstLogIndexCheckpoint;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.apache.ignite.raft.jraft.util.OnlyForTest;
+import org.apache.ignite.raft.jraft.util.Requires;
+import org.apache.ignite.raft.jraft.util.Utils;
+
+/**
+ * A logStorage implemented by java
+ */
+public class LogitLogStorage implements LogStorage {
+ private static final IgniteLogger LOG = Loggers.forClass(LogitLogStorage.class);
+
+ private static final String INDEX_STORE_PATH = "LogIndex";
+ private static final String SEGMENT_STORE_PATH = "LogSegment";
+ private static final String CONF_STORE_PATH = "LogConf";
+ private static final String FIRST_INDEX_CHECKPOINT = "FirstLogIndexCheckpoint";
+ private final FirstLogIndexCheckpoint firstLogIndexCheckpoint;
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = this.readWriteLock.readLock();
+ private final Lock writeLock = this.readWriteLock.writeLock();
+ private final StoreOptions storeOptions;
+ private final RaftOptions raftOptions;
+ private final String indexStorePath;
+ private final String segmentStorePath;
+ private final String confStorePath;
+ private ConfigurationManager configurationManager;
+ private LogEntryEncoder logEntryEncoder;
+ private LogEntryDecoder logEntryDecoder;
+ private SegmentLogDB segmentLogDB;
+ private IndexDB indexDB;
+ private ConfDB confDB;
+ private LogStoreFactory logStoreFactory;
+
+ /** Executor that handles prefix truncation. */
+ private final Executor executor;
+
+ public LogitLogStorage(final String path, final StoreOptions storeOptions, RaftOptions raftOptions, Executor executor) {
+ this.indexStorePath = Paths.get(path, INDEX_STORE_PATH).toString();
+ this.segmentStorePath = Paths.get(path, SEGMENT_STORE_PATH).toString();
+ this.confStorePath = Paths.get(path, CONF_STORE_PATH).toString();
+ this.storeOptions = storeOptions;
+ this.raftOptions = raftOptions;
+ this.executor = executor;
+ final String checkPointPath = Paths.get(path, FIRST_INDEX_CHECKPOINT).toString();
+ this.firstLogIndexCheckpoint = new FirstLogIndexCheckpoint(checkPointPath, raftOptions);
+ }
+
+ @Override
+ public boolean init(final LogStorageOptions opts) {
+ Requires.requireNonNull(opts.getConfigurationManager(), "Null conf manager");
+ Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log entry codec factory");
+ this.writeLock.lock();
+ try {
+ this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+ this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+ this.configurationManager = opts.getConfigurationManager();
+
+ // Create dbs and recover
+ this.logStoreFactory = new LogStoreFactory(this.storeOptions, raftOptions);
+ this.indexDB = new IndexDB(this.indexStorePath);
+ this.segmentLogDB = new SegmentLogDB(this.segmentStorePath);
+ this.confDB = new ConfDB(this.confStorePath);
+ if (!(this.indexDB.init(this.logStoreFactory) && this.segmentLogDB.init(this.logStoreFactory) && this.confDB
+ .init(this.logStoreFactory))) {
+ LOG.warn("Init dbs failed when startup logitLogStorage");
+ return false;
+ }
+
+ this.firstLogIndexCheckpoint.load();
+ return recoverAndLoad();
+ } catch (final IOException e) {
+ LOG.error("Error on load firstLogIndexCheckPoint", e);
+ } finally {
+ this.writeLock.unlock();
+ }
+ return false;
+ }
+
+ public boolean recoverAndLoad() {
+ this.writeLock.lock();
+ try {
+ this.indexDB.recover();
+ this.segmentLogDB.recover();
+ this.confDB.recover();
+
+ // Check consistency
+ if (!checkConsistencyAndAlignLog()) {
+ LOG.warn("Check the consistency and align log failed");
+ return false;
+ }
+
+ // Load configuration to conf manager
+ loadConfiguration();
+
+ // Set first log index
+ if (!this.firstLogIndexCheckpoint.isInit()) {
+ saveFirstLogIndex(this.indexDB.getFirstLogIndex());
+ }
+ LOG.info("Recover dbs and start timingServer success, last recover index:{}",
+ this.indexDB.getLastLogIndex());
+ return true;
+ } catch (final Exception e) {
+ LOG.error("Error on recover db", e);
+ return false;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Check db's consistency and align the log;
+ * @return true if align success;
+ */
+ private boolean checkConsistencyAndAlignLog() {
+ final long lastIndex = this.indexDB.getLastLogIndex();
+ final long lastSegmentIndex = this.segmentLogDB.getLastLogIndex();
+ final long lastConfIndex = this.confDB.getLastLogIndex();
+ if (lastIndex == lastSegmentIndex || lastIndex == lastConfIndex) {
+ return true;
+ }
+ final long maxLogIndex = Math.max(lastSegmentIndex, lastConfIndex);
+ if (lastIndex > maxLogIndex) {
+ // In this case, just align indexDB to the index of max(lastSegmentIndex, lastConfIndex)
+ return this.indexDB.truncateSuffix(maxLogIndex, 0);
+ } else {
+ // In this case, we should generate indexEntry array sorted by index from segmentDB and confDB, then
+ // store indexEntry array to indexDB
+
+ // Step1, lookup last (segment/conf) index in indexDB
+ final Pair<IndexEntry, IndexEntry> lastIndexPair = this.indexDB.lookupLastLogIndexAndPosFromTail();
+ IndexEntry lastSegmentIndexInfo = lastIndexPair.getFirst();
+ IndexEntry lastConfIndexInfo = lastIndexPair.getSecond();
+
+ /**
+ * There exists a bad case, for example
+ * The index db has index entries 1 ~ 12, but all of the index entries are log index, don't contain conf index
+ * The segmentLog db has logs 1 ~ 12 and 16 ~ 19, the conf db has logs 13 ~ 15.
+ * So in this case, the lastConfIndexInfo will be null, but we should set it to the first log position
+ */
+ if (lastSegmentIndexInfo == null) {
+ lastSegmentIndexInfo = new IndexEntry(this.segmentLogDB.getFirstLogIndex(), FileHeader.HEADER_SIZE,
+ IndexType.IndexSegment.getType());
+ }
+ if (lastConfIndexInfo == null) {
+ lastConfIndexInfo = new IndexEntry(this.confDB.getFirstLogIndex(), FileHeader.HEADER_SIZE,
+ IndexType.IndexConf.getType());
+ }
+
+ // Step2, Using two-way merging algorithm to construct ordered index entry array
+ final LogEntryIterator segmentLogIterator = this.segmentLogDB.iterator(this.logEntryDecoder,
+ lastSegmentIndexInfo.getLogIndex(), lastSegmentIndexInfo.getPosition());
+ final LogEntryIterator confLogIterator = this.confDB.iterator(this.logEntryDecoder,
+ lastConfIndexInfo.getLogIndex(), lastConfIndexInfo.getPosition());
+ final List<IndexEntry> indexArray = generateOrderedIndexArrayByMergingLogIterator(segmentLogIterator,
+ confLogIterator);
+
+ // Step3, store array to indexDB
+ long maxFlushPosition = this.indexDB.appendBatchIndexAsync(indexArray);
+
+ // Step4, wait for flushing indexDB
+ return this.indexDB.waitForFlush(maxFlushPosition, this.storeOptions.getMaxFlushTimes());
+ }
+ }
+
+ /**
+ * Generate ordered index entry array by using tow-way merging algorithm
+ * @param segmentLogIterator segment log iterator
+ * @param confLogIterator conf log iterator
+ * @return ordered index entry array
+ */
+ public List<IndexEntry> generateOrderedIndexArrayByMergingLogIterator(final LogEntryIterator segmentLogIterator,
+ final LogEntryIterator confLogIterator) {
+ LogEntry segmentEntry = null, confEntry = null;
+ int segmentPosition = -1, confPosition = -1;
+ final List<IndexEntry> indexEntries = new ArrayList<>();
+ while (true) {
+ // Pull next entry
+ if (segmentEntry == null && segmentLogIterator != null && segmentLogIterator.hasNext()) {
+ segmentEntry = segmentLogIterator.next();
+ segmentPosition = segmentLogIterator.getReadPosition();
+ }
+ if (confEntry == null && confLogIterator != null && confLogIterator.hasNext()) {
+ confEntry = confLogIterator.next();
+ confPosition = confLogIterator.getReadPosition();
+ }
+ if (segmentEntry == null && confEntry == null) {
+ break;
+ }
+ // Merge
+ if (segmentEntry != null && confEntry != null) {
+ if (segmentEntry.getId().getIndex() < confEntry.getId().getIndex()) {
+ indexEntries.add(new IndexEntry(segmentEntry.getId().getIndex(), segmentPosition,
+ IndexType.IndexSegment.getType()));
+ segmentEntry = null;
+ } else {
+ indexEntries.add(new IndexEntry(confEntry.getId().getIndex(), confPosition, IndexType.IndexConf
+ .getType()));
+ confEntry = null;
+ }
+ } else {
+ indexEntries.add(segmentEntry != null ? new IndexEntry(segmentEntry.getId().getIndex(),
+ segmentPosition, IndexType.IndexSegment.getType()) : new IndexEntry(confEntry.getId().getIndex(),
+ confPosition, IndexType.IndexConf.getType()));
+ segmentEntry = confEntry = null;
+ }
+ }
+ return indexEntries;
+ }
+
+ private boolean saveFirstLogIndex(final long logIndex) {
+ try {
+ this.firstLogIndexCheckpoint.setFirstLogIndex(logIndex);
+ return this.firstLogIndexCheckpoint.save();
+ } catch (final IOException e) {
+ LOG.error("Error when save first log index", e);
+ return false;
+ }
+ }
+
+ /**
+ * Load configuration logEntries in confDB to configurationManager
+ */
+ public void loadConfiguration() {
+ final LogEntryIterator confIterator = this.confDB.iterator(this.logEntryDecoder);
+ LogEntry entry;
+ while ((entry = confIterator.next()) != null) {
+ if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
+ final ConfigurationEntry confEntry = new ConfigurationEntry();
+ confEntry.setId(new LogId(entry.getId().getIndex(), entry.getId().getTerm()));
+ confEntry.setConf(new Configuration(entry.getPeers(), entry.getLearners()));
+ if (entry.getOldPeers() != null) {
+ confEntry.setOldConf(new Configuration(entry.getOldPeers(), entry.getOldLearners()));
+ }
+ if (this.configurationManager != null) {
+ this.configurationManager.add(confEntry);
+ }
+ }
+ }
+ }
+
+ /**************************** Implementation ********************************/
+
+ @Override
+ public long getFirstLogIndex() {
+ this.readLock.lock();
+ try {
+ if (this.firstLogIndexCheckpoint.firstLogIndex >= 0) {
+ return this.firstLogIndexCheckpoint.firstLogIndex;
+ } else if (this.indexDB.getFirstLogIndex() >= 0) {
+ return this.indexDB.getFirstLogIndex();
+ } else {
+ return 1L;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public long getLastLogIndex() {
+ this.readLock.lock();
+ try {
+ if (this.indexDB.getLastLogIndex() >= 0) {
+ // Just use indexDB to get lastLogIndex
+ return this.indexDB.getLastLogIndex();
+ } else {
+ return 0;
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public LogEntry getEntry(final long index) {
+ this.readLock.lock();
+ try {
+ if (index < getFirstLogIndex() || index > getLastLogIndex()) {
+ return null;
+ }
+ final IndexEntry indexEntry = this.indexDB.lookupIndex(index);
+ final int phyPosition = indexEntry.getPosition();
+ final byte logType = indexEntry.getLogType();
+ if (phyPosition != -1) {
+ byte[] logBytes;
+ if (logType == IndexType.IndexSegment.getType()) {
+ logBytes = this.segmentLogDB.lookupLog(index, phyPosition);
+ } else {
+ logBytes = this.confDB.lookupLog(index, phyPosition);
+ }
+ if (logBytes != null) {
+ return this.logEntryDecoder.decode(logBytes);
+ }
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ return null;
+ }
+
+ @Override
+ public long getTerm(final long index) {
+ final LogEntry entry = getEntry(index);
+ if (entry != null) {
+ return entry.getId().getTerm();
+ }
+ return 0;
+ }
+
+ @Override
+ public boolean appendEntry(final LogEntry entry) {
+ this.readLock.lock();
+ try {
+ final long logIndex = entry.getId().getIndex();
+ final byte[] logData = this.logEntryEncoder.encode(entry);
+ if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
+ return doAppendEntry(logIndex, logData, this.confDB, IndexType.IndexConf, true);
+ } else {
+ return doAppendEntry(logIndex, logData, this.segmentLogDB, IndexType.IndexSegment, true);
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public int appendEntries(final List<LogEntry> entries) {
+ this.readLock.lock();
+ try {
+ int appendCount = 0;
+ final int size = entries.size();
+
+ // Find last log and last conf log
+ int lastLogIndex = -1;
+ int lastConfIndex = -1;
+ for (int i = entries.size() - 1; i >= 0; i--) {
+ final LogEntry entry = entries.get(i);
+ final boolean isConfEntry = (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION);
+ if (isConfEntry && lastConfIndex == -1) {
+ lastConfIndex = i;
+ } else if (!isConfEntry && lastLogIndex == -1) {
+ lastLogIndex = i;
+ }
+ if (lastConfIndex >= 0 && lastLogIndex >= 0) {
+ break;
+ }
+ }
+
+ for (int i = 0; i < size; i++) {
+ final boolean isWaitingFlush = (i == lastLogIndex || i == lastConfIndex);
+ final LogEntry entry = entries.get(i);
+ final long logIndex = entry.getId().getIndex();
+ final byte[] logData = this.logEntryEncoder.encode(entry);
+ if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
+ if (doAppendEntry(logIndex, logData, this.confDB, IndexType.IndexConf, isWaitingFlush)) {
+ appendCount++;
+ }
+ } else {
+ if (doAppendEntry(logIndex, logData, this.segmentLogDB, IndexType.IndexSegment, isWaitingFlush)) {
+ appendCount++;
+ }
+ }
+ }
+
+ return appendCount;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private boolean doAppendEntry(final long logIndex, final byte[] data, final AbstractDB logDB,
+ final IndexType indexType, final boolean isWaitingFlush) {
+ this.readLock.lock();
+ try {
+ if (logDB == null || this.indexDB == null) {
+ return false;
+ }
+
+ // Append log async , get position infos
+ final Pair<Integer, Long> logPair = logDB.appendLogAsync(logIndex, data);
+ if (logPair.getFirst() < 0 || logPair.getSecond() < 0) {
+ return false;
+ }
+
+ final Pair<Integer, Long> indexPair = this.indexDB
+ .appendIndexAsync(logIndex, logPair.getFirst(), indexType);
+ if (indexPair.getFirst() < 0 || indexPair.getSecond() < 0) {
+ return false;
+ }
+
+ // Save first log index
+ if (!this.firstLogIndexCheckpoint.isInit()) {
+ saveFirstLogIndex(logIndex);
+ }
+
+ if (isWaitingFlush) {
+ return waitForFlush(logDB, logPair.getSecond(), indexPair.getSecond());
+ }
+ return true;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ private boolean waitForFlush(final AbstractDB logDB, final long exceptedLogPosition,
+ final long exceptedIndexPosition) {
+ final int maxFlushTimes = this.storeOptions.getMaxFlushTimes();
+ // We should flush log db first, because even If the power fails after flushing the log db
+ // we can restore the index db based on the log db.
+ if (!logDB.waitForFlush(exceptedLogPosition, maxFlushTimes)) {
+ return false;
+ }
+ return this.indexDB.waitForFlush(exceptedIndexPosition, maxFlushTimes);
+ }
+
+ @Override
+ public boolean truncatePrefix(final long firstIndexKept) {
+ this.readLock.lock();
+ try {
+ final boolean ret = saveFirstLogIndex(firstIndexKept);
+ if (ret) {
+ Utils.runInThread(executor, () -> {
+ this.indexDB.truncatePrefix(firstIndexKept);
+ this.segmentLogDB.truncatePrefix(firstIndexKept);
+ this.confDB.truncatePrefix(firstIndexKept);
+ });
+ }
+ return ret;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean truncateSuffix(final long lastIndexKept) {
+ final Pair<Integer, Integer> posPair = this.indexDB.lookupFirstLogPosFromLogIndex(lastIndexKept + 1);
+ final int SegmentTruncatePos = posPair.getFirst();
+ final int ConfLogTruncatePos = posPair.getSecond();
+ final int lastIndexKeptPos = this.indexDB.lookupIndex(lastIndexKept).getPosition();
+
+ if (lastIndexKeptPos != -1) {
+ // Truncate indexDB
+ this.indexDB.truncateSuffix(lastIndexKept, 0);
+ // Truncate segmentDB
+ this.segmentLogDB.truncateSuffix(lastIndexKept, SegmentTruncatePos);
+ // Truncate confDB
+ this.confDB.truncateSuffix(lastIndexKept, ConfLogTruncatePos);
+
+ return this.indexDB.getLastLogIndex() == lastIndexKept;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean reset(final long nextLogIndex) {
+ this.writeLock.lock();
+ try {
+ LogEntry entry = getEntry(nextLogIndex);
+ this.indexDB.reset(nextLogIndex);
+ this.segmentLogDB.reset(nextLogIndex);
+ this.confDB.reset(nextLogIndex);
+ if (entry == null) {
+ entry = new LogEntry();
+ entry.setType(EnumOutter.EntryType.ENTRY_TYPE_NO_OP);
+ entry.setId(new LogId(nextLogIndex, 0));
+ }
+ saveFirstLogIndex(-1);
+ return appendEntry(entry);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ this.writeLock.lock();
+ try {
+ this.indexDB.shutdown();
+ this.segmentLogDB.shutdown();
+ this.confDB.shutdown();
+ } catch (final Exception e) {
+ LOG.error("Error on shutdown dbs", e);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @OnlyForTest
+ public IndexDB getIndexDB() {
+ return indexDB;
+ }
+
+ @OnlyForTest
+ public ConfDB getConfDB() {
+ return confDB;
+ }
+
+ @OnlyForTest
+ public SegmentLogDB getSegmentLogDB() {
+ return segmentLogDB;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
new file mode 100644
index 0000000000..7477abb61d
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/AbstractDB.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import java.io.IOException;
+import java.nio.file.Path;import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.raft.jraft.Lifecycle;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile.RecoverResult;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileManager;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.AbortFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.FlushStatusCheckpoint;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.segment.SegmentFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.ServiceManager;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+
+/**
+ * DB parent class that invokes fileManager and anager
+ * and wrappers uniform functions such as recover() etc..
+ */
+public abstract class AbstractDB implements Lifecycle<LogStoreFactory> {
+ private static final IgniteLogger LOG = Loggers.forClass(AbstractDB.class);
+
+ private static final String FLUSH_STATUS_CHECKPOINT = "FlushStatusCheckpoint";
+ private static final String ABORT_FILE = "Abort";
+
+ protected final String storePath;
+ protected FileManager fileManager;
+ protected ServiceManager serviceManager;
+ protected LogStoreFactory logStoreFactory;
+ protected StoreOptions storeOptions;
+ protected AbortFile abortFile;
+ protected FlushStatusCheckpoint flushStatusCheckpoint;
+ private ScheduledExecutorService checkpointExecutor;
+
+ protected AbstractDB(final String storePath) {
+ this.storePath = storePath;
+ }
+
+ @Override
+ public boolean init(final LogStoreFactory logStoreFactory) {
+ this.logStoreFactory = logStoreFactory;
+ this.storeOptions = logStoreFactory.getStoreOptions();
+ final String flushStatusCheckpointPath = Paths.get(this.storePath, FLUSH_STATUS_CHECKPOINT).toString();
+ final String abortFilePath = Paths.get(this.storePath, ABORT_FILE).toString();
+ this.flushStatusCheckpoint = new FlushStatusCheckpoint(flushStatusCheckpointPath, logStoreFactory.getRaftOptions());
+ this.abortFile = new AbortFile(abortFilePath);
+ this.serviceManager = logStoreFactory.newServiceManager(this);
+ if (!this.serviceManager.init(logStoreFactory)) {
+ return false;
+ }
+ this.fileManager = logStoreFactory.newFileManager(getDBFileType(), this.storePath,
+ this.serviceManager.getAllocateService());
+ this.checkpointExecutor = Executors
+ .newSingleThreadScheduledExecutor(new NamedThreadFactory(getDBName() + "-Checkpoint-Thread-", LOG));
+ final int interval = this.storeOptions.getCheckpointFlushStatusInterval();
+ this.checkpointExecutor.scheduleAtFixedRate(this::doCheckpoint, interval, interval, TimeUnit.MILLISECONDS);
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ doCheckpoint();
+ if (this.serviceManager != null) {
+ this.serviceManager.shutdown();
+ }
+ if (this.fileManager != null) {
+ this.fileManager.shutdown();
+ }
+ if (this.abortFile != null) {
+ this.abortFile.destroy();
+ }
+ this.checkpointExecutor.shutdown();
+ }
+
+ /**
+ * @return this db's name
+ */
+ public String getDBName() {
+ return getClass().getSimpleName();
+ }
+
+ /**
+ * @return this db's file type (index or segmentLog or conf)
+ */
+ public abstract FileType getDBFileType();
+
+ /**
+ * @return this db's file size
+ */
+ public abstract int getDBFileSize();
+
+ /**
+ * Log Entry iterator
+ */
+ public static class LogEntryIterator implements Iterator<LogEntry> {
+ private final AbstractFile[] files;
+ private int currentReadPos;
+ private int preReadPos;
+ private int currentFileId;
+ private final LogEntryDecoder logEntryDecoder;
+
+ /**
+ *
+ * @param files target files
+ * @param logEntryDecoder decoder
+ * @param currentReadPos the beginning read position in the first file
+ */
+ public LogEntryIterator(final AbstractFile[] files, final LogEntryDecoder logEntryDecoder,
+ final int currentReadPos) {
+ this.files = files;
+ this.logEntryDecoder = logEntryDecoder;
+ if (files.length > 0) {
+ this.currentFileId = 0;
+ this.currentReadPos = Math.max(currentReadPos, FileHeader.HEADER_SIZE);
+ } else {
+ this.currentFileId = -1;
+ this.currentReadPos = -1;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.currentFileId >= 0 && this.currentFileId < this.files.length;
+ }
+
+ @Override
+ public LogEntry next() {
+ if (this.currentFileId == -1)
+ { return null; }
+ byte[] data;
+ while (true) {
+ if (currentFileId >= this.files.length)
+ { return null; }
+ final SegmentFile segmentFile = (SegmentFile) this.files[currentFileId];
+ if (segmentFile == null) {
+ return null;
+ }
+
+ data = segmentFile.lookupData(this.currentReadPos);
+ if (data == null) {
+ // Reach file end
+ this.currentFileId += 1;
+ this.currentReadPos = FileHeader.HEADER_SIZE;
+ } else {
+ this.preReadPos = this.currentReadPos;
+ this.currentReadPos += SegmentFile.getWriteBytes(data);
+ return this.logEntryDecoder.decode(data);
+ }
+ }
+ }
+
+ public int getReadPosition() {
+ return this.preReadPos;
+ }
+ }
+
+ public LogEntryIterator iterator(final LogEntryDecoder logEntryDecoder, long beginIndex, int beginPosition) {
+ final AbstractFile[] files = this.fileManager.findFileFromLogIndex(beginIndex);
+ return new LogEntryIterator(files, logEntryDecoder, beginPosition);
+ }
+
+ public LogEntryIterator iterator(final LogEntryDecoder logEntryDecoder) {
+ final AbstractFile[] files = this.fileManager.copyFiles();
+ return new LogEntryIterator(files, logEntryDecoder, 0);
+ }
+
+ /**
+ * Recover when startUp
+ */
+ public synchronized void recover() {
+ final List<AbstractFile> files = this.fileManager.loadExistedFiles();
+ try {
+ if (files.isEmpty()) {
+ this.fileManager.setFlushedPosition(0);
+ this.abortFile.create();
+ return;
+ }
+ this.flushStatusCheckpoint.load();
+ final boolean normalExit = !this.abortFile.exists();
+ long recoverOffset;
+ int startRecoverIndex;
+ if (!normalExit) {
+ // Abnormal exit, should recover from lastCheckpointFile
+ startRecoverIndex = findLastCheckpointFile(files, this.flushStatusCheckpoint);
+ LOG.info("{} {} did not exit normally, will try to recover files from fileIndex:{}.", getDBName(),
+ this.storePath, startRecoverIndex);
+ } else {
+ // Normal exit , just recover last file
+ startRecoverIndex = files.size() - 1;
+ }
+ recoverOffset = (long) startRecoverIndex * (long) getDBFileSize();
+ recoverOffset = recoverFiles(startRecoverIndex, files, recoverOffset);
+ this.fileManager.setFlushedPosition(recoverOffset);
+
+ if (normalExit) {
+ this.abortFile.create();
+ } else {
+ this.abortFile.touch();
+ }
+ } catch (final Exception e) {
+ LOG.error("Error on recover {} files , store path: {} , {}", getDBName(), this.storePath, e);
+ throw new RuntimeException(e);
+ } finally {
+ startServiceManager();
+ }
+ }
+
+ /**
+ * Recover files
+ * @return last recover offset
+ */
+ protected long recoverFiles(final int startRecoverIndex, final List<AbstractFile> files, long processOffset) {
+ AbstractFile preFile = null;
+ boolean needTruncate = false;
+ for (int index = 0; index < files.size(); index++) {
+ final AbstractFile file = files.get(index);
+ final boolean isLastFile = index == files.size() - 1;
+
+ if (index < startRecoverIndex) {
+ // Update files' s position when don't need to recover
+ file.updateAllPosition(getDBFileSize());
+ } else {
+ final RecoverResult result = file.recover();
+ if (result.recoverSuccess()) {
+ if (result.recoverTotal()) {
+ processOffset += isLastFile ? result.getLastOffset() : getDBFileSize();
+ } else {
+ processOffset += result.getLastOffset();
+ needTruncate = true;
+ }
+ } else {
+ needTruncate = true;
+ }
+ }
+
+ if (preFile != null) {
+ preFile.setLastLogIndex(file.getFirstLogIndex() - 1);
+ }
+ preFile = file;
+
+ if (needTruncate) {
+ // Error on recover files , truncate to processOffset
+ LOG.warn("Try to truncate files to processOffset:{} when recover files", processOffset);
+ this.fileManager.truncateSuffixByOffset(processOffset);
+ break;
+ }
+ }
+ return processOffset;
+ }
+
+ private int findLastCheckpointFile(final List<AbstractFile> files, final FlushStatusCheckpoint checkpoint) {
+ if (checkpoint == null || checkpoint.fileName == null) {
+ return 0;
+ }
+ for (int fileIndex = 0; fileIndex < files.size(); fileIndex++) {
+ final AbstractFile file = files.get(fileIndex);
+ if (getFileName(file).equalsIgnoreCase(checkpoint.fileName)) {
+ return fileIndex;
+ }
+ }
+ return 0;
+ }
+
+ private static String getFileName(AbstractFile file) {
+ return Path.of(file.getFilePath()).getFileName().toString();
+ }
+
+ private void doCheckpoint() {
+ long flushedPosition = getFlushedPosition();
+ if (flushedPosition % getDBFileSize() == 0) {
+ flushedPosition -= 1;
+ }
+ final AbstractFile file = this.fileManager.findFileByOffset(flushedPosition, false);
+ try {
+ if (file != null) {
+ this.flushStatusCheckpoint.setFileName(getFileName(file));
+ this.flushStatusCheckpoint.setFlushPosition(flushedPosition);
+ this.flushStatusCheckpoint.setLastLogIndex(getLastLogIndex());
+ this.flushStatusCheckpoint.save();
+ }
+ } catch (final IOException e) {
+ LOG.error("Error when do checkpoint in db:{}", getDBName());
+ }
+ }
+
+ /**
+ * Write the data and return it's wrote position.
+ * @param data logEntry data
+ * @return (wrotePosition, expectFlushPosition)
+ */
+ public Pair<Integer, Long> appendLogAsync(final long logIndex, final byte[] data) {
+ final int waitToWroteSize = SegmentFile.getWriteBytes(data);
+ final SegmentFile segmentFile = (SegmentFile) this.fileManager.getLastFile(logIndex, waitToWroteSize, true);
+ if (segmentFile != null) {
+ final int pos = segmentFile.appendData(logIndex, data);
+ final long expectFlushPosition = segmentFile.getFileFromOffset() + pos + waitToWroteSize;
+ return Pair.of(pos, expectFlushPosition);
+ }
+ return Pair.of(-1, (long) -1);
+ }
+
+ /**
+ * Read log from the segmentFile.
+ *
+ * @param logIndex the log index
+ * @param pos the position to read
+ * @return read data
+ */
+ public byte[] lookupLog(final long logIndex, final int pos) {
+ final SegmentFile segmentFile = (SegmentFile) this.fileManager.findFileByLogIndex(logIndex, false);
+ if (segmentFile != null) {
+ final long targetFlushPosition = segmentFile.getFileFromOffset() + pos;
+ if (targetFlushPosition <= getFlushedPosition()) {
+ return segmentFile.lookupData(logIndex, pos);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Flush db files and wait for flushPosition >= maxExpectedFlushPosition
+ * @return true if flushPosition >= maxExpectedFlushPosition
+ */
+ public boolean waitForFlush(final long maxExpectedFlushPosition, final int maxFlushTimes) {
+ int cnt = 0;
+ while (getFlushedPosition() < maxExpectedFlushPosition) {
+ flush();
+ cnt++;
+ if (cnt > maxFlushTimes) {
+ LOG.error("Try flush db {} times, but the flushPosition {} can't exceed expectedFlushPosition {}",
+ maxFlushTimes, getFlushedPosition(), maxExpectedFlushPosition);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void startServiceManager() {
+ this.serviceManager.start();
+ }
+
+ public boolean flush() {
+ return this.fileManager.flush();
+ }
+
+ public boolean truncatePrefix(final long firstIndexKept) {
+ return this.fileManager.truncatePrefix(firstIndexKept);
+
+ }
+
+ public boolean truncateSuffix(final long lastIndexKept, final int pos) {
+ if (this.fileManager.truncateSuffix(lastIndexKept, pos)) {
+ doCheckpoint();
+ }
+ return false;
+ }
+
+ public boolean reset(final long nextLogIndex) {
+ this.flushStatusCheckpoint.destroy();
+ this.fileManager.reset(nextLogIndex);
+ doCheckpoint();
+ return true;
+ }
+
+ public long getFlushedPosition() {
+ return this.fileManager.getFlushedPosition();
+ }
+
+ public StoreOptions getStoreOptions() {
+ return this.storeOptions;
+ }
+
+ public String getStorePath() {
+ return this.storePath;
+ }
+
+ public long getFirstLogIndex() {
+ return this.fileManager.getFirstLogIndex();
+ }
+
+ public long getLastLogIndex() {
+ return this.fileManager.getLastLogIndex();
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
new file mode 100644
index 0000000000..a79ee0ace3
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/ConfDB.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+
+/**
+ * DB that stores configuration type log entry
+ */
+public class ConfDB extends AbstractDB {
+
+ public ConfDB(final String storePath) {
+ super(storePath);
+ }
+
+ @Override
+ public FileType getDBFileType() {
+ return FileType.FILE_CONFIGURATION;
+ }
+
+ @Override
+ public int getDBFileSize() {
+ return this.storeOptions.getConfFileSize();
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
new file mode 100644
index 0000000000..012e740bb1
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/IndexDB.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import java.util.List;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+
+/**
+ * DB that stores index entry
+ */
+public class IndexDB extends AbstractDB {
+
+ public IndexDB(final String storePath) {
+ super(storePath);
+ }
+
+ /**
+ * Append IndexEntry(logIndex , position, indexType)
+ * @return (wrotePosition, expectFlushPosition)
+ */
+ public Pair<Integer, Long> appendIndexAsync(final long logIndex, final int position, final IndexType type) {
+ final long lastIndex = getLastLogIndex();
+ if (lastIndex != -1 && logIndex != getLastLogIndex() + 1) {
+ return Pair.of(-1, (long) -1);
+ }
+ final int waitToWroteSize = IndexEntry.INDEX_SIZE;
+ final IndexFile indexFile = (IndexFile) this.fileManager.getLastFile(logIndex, waitToWroteSize, true);
+ if (indexFile != null) {
+ final int pos = indexFile.appendIndex(logIndex, position, type.getType());
+ final long expectFlushPosition = indexFile.getFileFromOffset() + pos + waitToWroteSize;
+ return Pair.of(pos, expectFlushPosition);
+ }
+ return Pair.of(-1, (long) -1);
+ }
+
+ /**
+ * Append IndexEntryArray(logIndex , position, indexType)
+ * @return max expectFlushPosition
+ */
+ public Long appendBatchIndexAsync(final List<IndexEntry> indexArray) {
+ long maxFlushPosition = -1;
+ for (int i = 0; i < indexArray.size(); i++) {
+ final IndexEntry index = indexArray.get(i);
+ final long logIndex = index.getLogIndex();
+ if (logIndex == getLastLogIndex() + 1) {
+ final Pair<Integer, Long> flushPair = appendIndexAsync(logIndex, index.getPosition(),
+ index.getLogType() == 1 ? IndexType.IndexSegment : IndexType.IndexConf);
+ maxFlushPosition = Math.max(maxFlushPosition, flushPair.getSecond());
+ }
+ }
+ return maxFlushPosition;
+ }
+
+ /**
+ * Lookup IndexEntry by logIndex
+ */
+ public IndexEntry lookupIndex(final long logIndex) {
+ final IndexFile indexFile = (IndexFile) this.fileManager.findFileByLogIndex(logIndex, false);
+ if (indexFile != null) {
+ final int indexPos = indexFile.calculateIndexPos(logIndex);
+ final long targetFlushPosition = indexFile.getFileFromOffset() + indexPos;
+ if (targetFlushPosition <= getFlushedPosition()) {
+ return indexFile.lookupIndex(logIndex);
+ }
+ }
+ return IndexEntry.newInstance();
+ }
+
+ /**
+ * Search the first segment log index and first conf log index from logIndex
+ * @return pair of first log pos(segment / conf)
+ */
+ public Pair<Integer, Integer> lookupFirstLogPosFromLogIndex(final long logIndex) {
+ final long lastLogIndex = getLastLogIndex();
+ int firstSegmentPos = -1;
+ int firstConfPos = -1;
+ long index = logIndex;
+ while (index <= lastLogIndex) {
+ final IndexEntry indexEntry = lookupIndex(index);
+ if (indexEntry.getLogType() == IndexType.IndexSegment.getType() && firstSegmentPos == -1) {
+ firstSegmentPos = indexEntry.getPosition();
+ } else if (indexEntry.getLogType() == IndexType.IndexConf.getType() && firstConfPos == -1) {
+ firstConfPos = indexEntry.getPosition();
+ }
+ if (firstSegmentPos > 0 && firstConfPos > 0) {
+ break;
+ }
+ index++;
+ }
+ return Pair.of(firstSegmentPos, firstConfPos);
+ }
+
+ /**
+ * Search the last segment log index and last conf log index from tail
+ * @return pair of IndexEntry<last log index and position> (segment / conf)
+ */
+ public Pair<IndexEntry, IndexEntry> lookupLastLogIndexAndPosFromTail() {
+ final long lastLogIndex = getLastLogIndex();
+ final long firstLogIndex = getFirstLogIndex();
+ IndexEntry lastSegmentIndex = null, lastConfIndex = null;
+ long index = lastLogIndex;
+ while (index >= firstLogIndex) {
+ final IndexEntry indexEntry = lookupIndex(index);
+ indexEntry.setLogIndex(index);
+ if (indexEntry.getLogType() == IndexType.IndexSegment.getType() && lastSegmentIndex == null) {
+ lastSegmentIndex = indexEntry;
+ } else if (indexEntry.getLogType() == IndexType.IndexConf.getType() && lastConfIndex == null) {
+ lastConfIndex = indexEntry;
+ }
+ if (lastSegmentIndex != null && lastConfIndex != null) {
+ break;
+ }
+ index--;
+ }
+ return Pair.of(lastSegmentIndex, lastConfIndex);
+ }
+
+ @Override
+ public FileType getDBFileType() {
+ return FileType.FILE_INDEX;
+ }
+
+ @Override
+ public int getDBFileSize() {
+ return this.storeOptions.getIndexFileSize();
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
new file mode 100644
index 0000000000..370194bd47
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/db/SegmentLogDB.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.db;
+
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+
+/**
+ * DB that stores logEntry
+ */
+public class SegmentLogDB extends AbstractDB {
+
+ public SegmentLogDB(final String storePath) {
+ super(storePath);
+ }
+
+ @Override
+ public FileType getDBFileType() {
+ return FileType.FILE_SEGMENT;
+ }
+
+ @Override
+ public int getDBFileSize() {
+ return this.storeOptions.getSegmentFileSize();
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
new file mode 100644
index 0000000000..8b262ddfc3
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/factory/LogStoreFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.factory;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileManager;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.segment.SegmentFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.AllocateFileService;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.ServiceManager;
+
+/**
+ * The factory that provides uniform construction functions
+ */
+public class LogStoreFactory {
+ private final StoreOptions storeOptions;
+ private final RaftOptions raftOptions;
+
+ public LogStoreFactory(final StoreOptions opts, RaftOptions raftOptions) {
+ this.storeOptions = opts;
+ this.raftOptions = raftOptions;}
+
+ /**
+ * Create new file(index/segment/conf)
+ */
+ public AbstractFile newFile(final FileType fileType, final String filePath) {
+ return isIndex(fileType) ? new IndexFile(filePath, this.storeOptions.getIndexFileSize()) : //
+ isConf(fileType) ? new SegmentFile(filePath, this.storeOptions.getConfFileSize()) : //
+ new SegmentFile(filePath, this.storeOptions.getSegmentFileSize());
+ }
+
+ /**
+ * Create new fileManager(index/segment/conf)
+ */
+ public FileManager newFileManager(final FileType fileType, final String storePath,
+ final AllocateFileService allocateService) {
+ final FileManager.FileManagerBuilder fileManagerBuilder = FileManager.newBuilder() //
+ .fileType(fileType) //
+ .fileSize(getFileSize(fileType)) //
+ .storePath(storePath) //
+ .logStoreFactory(this) //
+ .allocateService(allocateService);
+ return fileManagerBuilder.build();
+ }
+
+ /**
+ * Create new serviceManager
+ */
+ public ServiceManager newServiceManager(final AbstractDB abstractDB) {
+ return new ServiceManager(abstractDB);
+ }
+
+ /**
+ * Create new allocateFileService
+ */
+ public AllocateFileService newAllocateService(final AbstractDB abstractDB) {
+ return new AllocateFileService(abstractDB, this);
+ }
+
+ public int getFileSize(final FileType fileType) {
+ return isIndex(fileType) ? this.storeOptions.getIndexFileSize() : //
+ isConf(fileType) ? this.storeOptions.getConfFileSize() : //
+ this.storeOptions.getSegmentFileSize();
+ }
+
+ private boolean isIndex(final FileType fileType) {
+ return fileType == FileType.FILE_INDEX;
+ }
+
+ private boolean isConf(final FileType fileType) {
+ return fileType == FileType.FILE_CONFIGURATION;
+ }
+
+ public StoreOptions getStoreOptions() {
+ return this.storeOptions;
+ }
+
+ public RaftOptions getRaftOptions() {
+ return raftOptions;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
new file mode 100644
index 0000000000..ac0e4d36a6
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/AbstractFile.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.raft.jraft.storage.logit.LibC;
+import org.apache.ignite.raft.jraft.storage.logit.util.concurrent.ReferenceResource;
+import org.apache.ignite.raft.jraft.util.Platform;
+import org.apache.ignite.raft.jraft.util.Utils;
+
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+
+import sun.nio.ch.DirectBuffer;
+
+/**
+ * File parent class that wrappers uniform functions such as mmap(), flush() etc..
+ */
+public abstract class AbstractFile extends ReferenceResource {
+ private static final IgniteLogger LOG = Loggers.forClass(AbstractFile.class);
+
+ protected static final int BLANK_HOLE_SIZE = 64;
+
+ protected static final byte FILE_END_BYTE = 'x';
+
+ protected String filePath;
+
+ // The size of this file
+ protected int fileSize;
+
+ protected File file;
+
+ protected FileHeader header;
+
+ protected MappedByteBuffer mappedByteBuffer;
+
+ // Current write position
+ protected final AtomicInteger wrotePosition = new AtomicInteger(0);
+
+ // Current flush position
+ protected final AtomicInteger flushedPosition = new AtomicInteger(0);
+
+ protected final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ protected final Lock readLock = this.readWriteLock.readLock();
+ protected final Lock writeLock = this.readWriteLock.writeLock();
+ private volatile boolean isMapped = false;
+ private final ReentrantLock mapLock = new ReentrantLock();
+
+ public AbstractFile(final String filePath, final int fileSize, final boolean isMapped) {
+ initAndMap(filePath, fileSize, isMapped);
+ this.header = new FileHeader();
+ }
+
+ public void initAndMap(final String filePath, final int fileSize, final boolean isMapped) {
+ this.filePath = filePath;
+ this.fileSize = fileSize;
+ this.file = new File(filePath);
+ this.mapLock.lock();
+ try {
+ if (!this.file.exists()) {
+ if (!this.file.createNewFile()) {
+ throw new RuntimeException("Failed to create new file");
+ }
+ }
+ if (isMapped) {
+ this.map(MapMode.READ_WRITE);
+ }
+ } catch (final Throwable t) {
+ LOG.error("Error happen when create file:{}", filePath, t);
+ } finally {
+ this.mapLock.unlock();
+ }
+ }
+
+ public void mapInIfNecessary() {
+ if (!isMapped()) {
+ this.map(MapMode.READ_ONLY);
+ }
+ }
+
+ public void map(final MapMode mapMode) {
+ this.mapLock.lock();
+ try {
+ if (!isMapped()) {
+ try (final RandomAccessFile randomAccessFile = new RandomAccessFile(this.file, "rw");
+ final FileChannel fileChannel = randomAccessFile.getChannel()) {
+ this.mappedByteBuffer = fileChannel.map(mapMode, 0, this.fileSize);
+ this.isMapped = true;
+ }
+ }
+ } catch (final Throwable t) {
+ LOG.error("map file {} failed , {}", getFilePath(), t);
+ throw new RuntimeException(t);
+ } finally {
+ this.mapLock.unlock();
+ }
+ }
+
+ public void unmmap() {
+ if (isMapped()) {
+ this.mapLock.lock();
+ try {
+ if (isMapped()) {
+ this.mappedByteBuffer.force();
+ this.flushedPosition.set(getWrotePosition());
+ if (this.mappedByteBuffer != null) {
+ if (Platform.isLinux()) {
+ hintUnload();
+ }
+ Utils.unmap(this.mappedByteBuffer);
+ }
+ this.isMapped = false;
+ }
+ } catch (final Throwable t) {
+ LOG.error("error unmap file {} , {}", getFilePath(), t);
+ throw new RuntimeException(t);
+ } finally {
+ this.mapLock.unlock();
+ }
+ }
+ }
+
+ public boolean isMapped() {
+ return this.isMapped;
+ }
+
+ public static class RecoverResult {
+ // Is recover success
+ private final boolean recoverSuccess;
+ // Is recover total data or encounter an error when recover
+ private final boolean recoverTotal;
+ // Last recover offset
+ private final int lastOffset;
+
+ public RecoverResult(final boolean recoverSuccess, final boolean recoverTotal, final int lastOffset) {
+ this.recoverSuccess = recoverSuccess;
+ this.recoverTotal = recoverTotal;
+ this.lastOffset = lastOffset;
+ }
+
+ public static RecoverResult newInstance(final boolean isSuccess, final boolean isRecoverTotal,
+ final int recoverOffset) {
+ return new RecoverResult(isSuccess, isRecoverTotal, recoverOffset);
+ }
+
+ public int getLastOffset() {
+ return lastOffset;
+ }
+
+ public boolean recoverSuccess() {
+ return recoverSuccess;
+ }
+
+ public boolean recoverTotal() {
+ return recoverTotal;
+ }
+ }
+
+ /**
+ * Recover logic
+ * @return the last recover position
+ */
+ public RecoverResult recover() {
+ if (!loadHeader()) {
+ return RecoverResult.newInstance(false, false, -1);
+ }
+ final ByteBuffer byteBuffer = sliceByteBuffer();
+ int recoverPosition = this.header.getHeaderSize();
+ int recoverCnt = 0;
+ int lastLogPosition = recoverPosition;
+ boolean isFileEnd = false;
+ final long start = Utils.monotonicMs();
+ while (true) {
+ byteBuffer.position(recoverPosition);
+ final CheckDataResult checkResult = checkData(byteBuffer);
+ if (checkResult == CheckDataResult.FILE_END) {
+ // File end
+ isFileEnd = true;
+ break;
+ } else if (checkResult == CheckDataResult.CHECK_FAIL) {
+ // Check fail
+ break;
+ } else {
+ // Check success
+ recoverCnt++;
+ lastLogPosition = recoverPosition;
+ recoverPosition += checkResult.size;
+ }
+ }
+ updateAllPosition(recoverPosition);
+ onRecoverDone(lastLogPosition);
+ LOG.info("Recover file {} cost {} millis, recoverPosition:{}, recover logs:{}, lastLogIndex:{}", getFilePath(),
+ Utils.monotonicMs() - start, recoverPosition, recoverCnt, getLastLogIndex());
+ final boolean isRecoverTotal = isFileEnd || (recoverPosition == this.fileSize);
+ return RecoverResult.newInstance(true, isRecoverTotal, recoverPosition);
+ }
+
+ public enum CheckDataResult {
+ CHECK_SUCCESS(1), // If check success, return dataSize
+ CHECK_FAIL(-1), // If check failed, return -1
+ FILE_END(0); // If come to file end, return 0
+
+ private int size;
+
+ CheckDataResult(final int pos) {
+ this.size = pos;
+ }
+
+ public void setSize(final int pos) {
+ this.size = pos;
+ }
+ }
+
+ /**
+ *
+ * @return check result
+ */
+ public abstract CheckDataResult checkData(final ByteBuffer byteBuffer);
+
+ /**
+ * Trigger function when recover done
+ * @param recoverPosition the recover position
+ */
+ public abstract void onRecoverDone(final int recoverPosition);
+
+ /**
+ * Truncate file entries to logIndex
+ * @param logIndex the target logIndex
+ * @param pos the position of this entry, this parameter is needed only if this file is a segmentFile
+ */
+ public abstract int truncate(final long logIndex, final int pos);
+
+ /**
+ * Append data to file end
+ * @param logIndex logEntry index
+ * @param data data array
+ * @return wrote position
+ */
+ protected int doAppend(final long logIndex, final byte[] data) {
+ this.writeLock.lock();
+ try {
+ int wrotePos = getWrotePosition();
+ // First time append , set firstLogIndex to header
+ if (this.header.isBlank()) {
+ this.header.setFirstLogIndex(logIndex);
+ saveHeader();
+ wrotePos = this.header.getHeaderSize();
+ }
+ // Write data and update header
+ final ByteBuffer buffer = sliceByteBuffer();
+ put(buffer, wrotePos, data);
+ setWrotePosition(wrotePos + data.length);
+ this.header.setLastLogIndex(logIndex);
+ return wrotePos;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Flush data to disk
+ * @return The current flushed position
+ */
+ public int flush() {
+ if (hold()) {
+ final int value = getWrotePosition();
+ try {
+ this.mappedByteBuffer.force();
+ } catch (final Throwable e) {
+ LOG.error("Error occurred when force data to disk.", e);
+ throw new RuntimeException(e);
+ }
+ setFlushPosition(value);
+ release();
+ } else {
+ LOG.warn("In flush, hold failed, flush offset = {}.", getFlushedPosition());
+ setFlushPosition(getWrotePosition());
+ }
+ return getFlushedPosition();
+ }
+
+ public boolean shutdown(final long intervalForcibly, final boolean isDestroy) {
+ shutdown(intervalForcibly);
+ if (isCleanupOver()) {
+ try {
+ if (isDestroy) {
+ return IgniteUtils.deleteIfExists(this.file.toPath());
+ }
+ return true;
+ } catch (final Throwable t) {
+ LOG.error("Close file channel failed, {}", getFilePath(), t);
+ throw new RuntimeException(t);
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean cleanup(final long currentRef) {
+ if (isAvailable()) {
+ return false;
+ }
+ if (isCleanupOver() || !isMapped()) {
+ return true;
+ }
+ unmmap();
+ return true;
+ }
+
+ /**
+ * Load file header
+ */
+ public boolean loadHeader() {
+ final ByteBuffer byteBuffer = sliceByteBuffer();
+ byteBuffer.position(0);
+ return this.header.decode(byteBuffer);
+ }
+
+ /**
+ * Save file header
+ * Dont need to flush , header will be flushed by flushService
+ */
+ public void saveHeader() {
+ // save header to mappedByteBuffer
+ final ByteBuffer header = this.header.encode();
+ final ByteBuffer byteBuffer = sliceByteBuffer();
+ byteBuffer.position(0);
+ byteBuffer.put(header);
+ }
+
+ /**
+ * Fill empty bytes in this file end when this fill has no sufficient free space to store one message
+ */
+ public void fillEmptyBytesInFileEnd() {
+ this.writeLock.lock();
+ try {
+ final int wrotePosition = getWrotePosition();
+ final ByteBuffer byteBuffer = sliceByteBuffer();
+ for (int i = wrotePosition; i < this.fileSize; i++) {
+ byteBuffer.put(i, FILE_END_BYTE);
+ }
+ setWrotePosition(this.fileSize);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Clear data in [startPos, startPos+64).
+ */
+ public void clear(final int startPos) {
+ this.writeLock.lock();
+ try {
+ if (startPos < 0 || startPos > this.fileSize) {
+ return;
+ }
+ final int endPos = Math.min(this.fileSize, startPos + BLANK_HOLE_SIZE);
+ for (int i = startPos; i < endPos; i++) {
+ this.mappedByteBuffer.put(i, (byte) 0);
+ }
+ this.mappedByteBuffer.force();
+ LOG.info("File {} cleared data in [{}, {}].", this.filePath, startPos, endPos);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public void put(final ByteBuffer buffer, final int index, final byte[] data) {
+ for (int i = 0; i < data.length; i++) {
+ buffer.put(index + i, data[i]);
+ }
+ }
+
+ public long getFirstLogIndex() {
+ return this.header.getFirstLogIndex();
+ }
+
+ public long getLastLogIndex() {
+ return this.header.getLastLogIndex();
+ }
+
+ public void setLastLogIndex(final long lastLogIndex) {
+ this.header.setLastLogIndex(lastLogIndex);
+ }
+
+ public void setFileFromOffset(final long fileFromOffset) {
+ this.header.setFileFromOffset(fileFromOffset);
+ }
+
+ public long getFileFromOffset() {
+ return this.header.getFileFromOffset();
+ }
+
+ /**
+ * Returns true when the segment file contains the log index.
+ *
+ * @param logIndex the log index
+ * @return true if the segment file contains the log index, otherwise return false
+ */
+ public boolean contains(final long logIndex) {
+ this.readLock.lock();
+ try {
+ return logIndex >= this.header.getFirstLogIndex() && logIndex <= this.getLastLogIndex();
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public ByteBuffer sliceByteBuffer() {
+ return this.mappedByteBuffer.slice();
+ }
+
+ public void warmupFile() {
+ if (!isMapped()) {
+ return;
+ }
+ // Lock memory
+ if (Platform.isLinux()) {
+ hintLoad();
+ }
+ }
+
+ public void hintLoad() {
+ final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+ Pointer pointer = new Pointer(address);
+
+ long beginTime = Utils.monotonicMs();
+ if (Platform.isLinux()) {
+ int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
+ LOG.info("madvise(MADV_WILLNEED) {} {} {} ret = {} time consuming = {}", address, this.filePath,
+ this.fileSize, ret, Utils.monotonicMs() - beginTime);
+ }
+ }
+
+ public void hintUnload() {
+ final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
+ Pointer pointer = new Pointer(address);
+
+ long beginTime = Utils.monotonicMs();
+ if (Platform.isLinux()) {
+ int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_DONTNEED);
+ LOG.info("madvise(MADV_DONTNEED) {} {} {} ret = {} time consuming = {}", address, this.filePath,
+ this.fileSize, ret, Utils.monotonicMs() - beginTime);
+ }
+ }
+
+ public void reset() {
+ setWrotePosition(0);
+ setFlushPosition(0);
+ this.header.setFirstLogIndex(FileHeader.BLANK_OFFSET_INDEX);
+ flush();
+ }
+
+ public int getWrotePosition() {
+ return wrotePosition.get();
+ }
+
+ public void setWrotePosition(final int position) {
+ this.wrotePosition.set(position);
+ }
+
+ public int getFlushedPosition() {
+ return flushedPosition.get();
+ }
+
+ public void setFlushPosition(final int position) {
+ this.flushedPosition.set(position);
+ }
+
+ /**
+ * Update flush / wrote position to pos
+ * @param pos target pos
+ */
+ public void updateAllPosition(final int pos) {
+ setWrotePosition(pos);
+ setFlushPosition(pos);
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public boolean reachesFileEndBy(final int waitToWroteSize) {
+ return getWrotePosition() + waitToWroteSize > getFileSize();
+ }
+
+ public int getFileSize() {
+ return fileSize;
+ }
+
+ public boolean isBlank() {
+ return this.header.isBlank();
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
new file mode 100644
index 0000000000..d68f54e5f4
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileHeader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * File header:
+ * <pre>
+ * magic bytes first log index file from offset reserved
+ * [0x20 0x20] [... 8 bytes...] [... 8 bytes...] [... 8 bytes...]
+ * <pre>
+ */
+public class FileHeader {
+ private static final IgniteLogger LOG = Loggers.forClass(FileHeader.class);
+
+ @SuppressWarnings("unused")
+ private static final long RESERVED_FLAG = 0L;
+
+ public static final int HEADER_SIZE = 26;
+
+ public static final long BLANK_OFFSET_INDEX = -99;
+
+ private volatile long FirstLogIndex = BLANK_OFFSET_INDEX;
+
+ private long FileFromOffset = -1;
+
+ private volatile long LastLogIndex = BLANK_OFFSET_INDEX;
+
+ private static final byte MAGIC = 0x20;
+
+ public FileHeader() {
+ super();
+ }
+
+ public ByteBuffer encode() {
+ ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE);
+ buffer.put(MAGIC);
+ buffer.put(MAGIC);
+ buffer.putLong(this.FirstLogIndex);
+ buffer.putLong(this.FileFromOffset);
+ buffer.putLong(RESERVED_FLAG);
+ buffer.flip();
+ return buffer;
+ }
+
+ public boolean decode(final ByteBuffer buffer) {
+ if (buffer == null || buffer.remaining() < HEADER_SIZE) {
+ LOG.error("Fail to decode file header, invalid buffer length: {}", buffer == null ? 0 : buffer.remaining());
+ return false;
+ }
+ if (buffer.get() != MAGIC) {
+ return false;
+ }
+ if (buffer.get() != MAGIC) {
+ return false;
+ }
+ this.FirstLogIndex = buffer.getLong();
+ this.FileFromOffset = buffer.getLong();
+ return true;
+ }
+
+ public long getFirstLogIndex() {
+ return FirstLogIndex;
+ }
+
+ public void setFirstLogIndex(final long firstLogIndex) {
+ this.FirstLogIndex = firstLogIndex;
+ }
+
+ public long getFileFromOffset() {
+ return FileFromOffset;
+ }
+
+ public void setFileFromOffset(final long fileFromOffset) {
+ this.FileFromOffset = fileFromOffset;
+ }
+
+ public long getLastLogIndex() {
+ return LastLogIndex;
+ }
+
+ public void setLastLogIndex(final long lastLogIndex) {
+ this.LastLogIndex = lastLogIndex;
+ }
+
+ public int getHeaderSize() {
+ return HEADER_SIZE;
+ }
+
+ public boolean isBlank() {
+ return this.FirstLogIndex == BLANK_OFFSET_INDEX;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
new file mode 100644
index 0000000000..ea784bda31
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileManager.java
@@ -0,0 +1,577 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.AllocateFileService;
+import org.apache.ignite.raft.jraft.util.Requires;
+
+/**
+ * Uses list to manage AbstractFile(Index,Segment) by logIndex and offset
+ */
+public class FileManager {
+ private static final IgniteLogger LOG = Loggers.forClass(FileManager.class);
+
+ private final String storePath;
+
+ private final FileType fileType;
+
+ private final StoreOptions storeOptions;
+
+ private final int fileSize;
+
+ private final AllocateFileService allocateService;
+
+ private final LogStoreFactory logStoreFactory;
+
+ private final List<AbstractFile> files = new ArrayList<>();
+
+ private volatile long flushedPosition;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Lock readLock = lock.readLock();
+ private final Lock writeLock = lock.writeLock();
+
+ public FileManager(final FileType fileType, final int fileSize, final String storePath,
+ final LogStoreFactory logStoreFactory, final AllocateFileService allocateService) {
+ this.storePath = storePath;
+ this.fileType = fileType;
+ this.allocateService = allocateService;
+ this.storeOptions = logStoreFactory.getStoreOptions();
+ this.logStoreFactory = logStoreFactory;
+ this.fileSize = fileSize;
+ }
+
+ public static FileManagerBuilder newBuilder() {
+ return new FileManagerBuilder();
+ }
+
+ public static class FileManagerBuilder {
+ private FileType fileType;
+
+ private String storePath;
+
+ private Integer fileSize;
+
+ private LogStoreFactory logStoreFactory;
+
+ private AllocateFileService allocateService;
+
+ public FileManagerBuilder storePath(final String storePath) {
+ this.storePath = storePath;
+ return this;
+ }
+
+ public FileManagerBuilder fileType(final FileType fileType) {
+ this.fileType = fileType;
+ return this;
+ }
+
+ public FileManagerBuilder logStoreFactory(final LogStoreFactory logStoreFactory) {
+ this.logStoreFactory = logStoreFactory;
+ return this;
+ }
+
+ public FileManagerBuilder fileSize(final Integer abstractFileSize) {
+ this.fileSize = abstractFileSize;
+ return this;
+ }
+
+ public FileManagerBuilder allocateService(final AllocateFileService allocateService) {
+ this.allocateService = allocateService;
+ return this;
+ }
+
+ public FileManager build() {
+ Requires.requireNonNull(this.storePath, "storePath");
+ Requires.requireNonNull(this.fileType, "fileType");
+ Requires.requireNonNull(this.logStoreFactory, "logStoreFactory");
+ Requires.requireNonNull(this.fileSize, "fileSize");
+ Requires.requireNonNull(this.allocateService, "allocateService");
+ return new FileManager(fileType, fileSize, storePath, logStoreFactory, allocateService);
+ }
+ }
+
+ /**
+ * Load abstract files that existed in this store path
+ * This function will be called when db recover
+ * @return the existed files list
+ */
+ public List<AbstractFile> loadExistedFiles() {
+ final File dir = new File(this.storePath);
+ if(!dir.exists()) {
+ dir.mkdirs();
+ }
+ final File[] files = dir.listFiles();
+ if (files == null || files.length == 0) {
+ return Collections.emptyList();
+ }
+ Arrays.sort(files, Comparator.comparing(this::getFileSequenceFromFileName));
+ final List<AbstractFile> blankFiles = new ArrayList<>(files.length);
+ long nextFileSequence = 0;
+ for (final File file : files) {
+ AbstractFile abstractFile;
+ if ((abstractFile = checkFileCorrectnessAndMmap(file)) != null) {
+ if (abstractFile.loadHeader() && !abstractFile.isBlank()) {
+ this.files.add(abstractFile);
+ } else {
+ abstractFile.reset();
+ blankFiles.add(abstractFile);
+ }
+ nextFileSequence = Math.max(nextFileSequence, getFileSequenceFromFileName(file) + 1);
+ }
+ }
+ this.allocateService.setNextFileSequence(nextFileSequence);
+ // Add blank files to allocator so that we can reuse them
+ this.allocateService.addBlankAbstractFiles(blankFiles);
+ return this.files;
+ }
+
+ /**
+ * Check file's correctness of name and length
+ * @return mmap file
+ */
+ private AbstractFile checkFileCorrectnessAndMmap(final File file) {
+ if (!file.exists() || !file.getName().endsWith(this.fileType.getFileSuffix()))
+ { return null; }
+ AbstractFile abstractFile = null;
+ final long fileLength = file.length();
+ if (fileLength == this.fileSize) {
+ abstractFile = this.logStoreFactory.newFile(this.fileType, file.getPath());
+ }
+ return abstractFile;
+ }
+
+ public AbstractFile[] copyFiles() {
+ this.readLock.lock();
+ try {
+ return this.files.toArray(new AbstractFile[] {});
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public AbstractFile getLastFile(final long logIndex, final int waitToWroteSize, final boolean createIfNecessary) {
+ AbstractFile lastFile = null;
+ while (true) {
+ int fileCount = 0;
+ this.readLock.lock();
+ try {
+ if (!this.files.isEmpty()) {
+ fileCount = this.files.size();
+ final AbstractFile file = this.files.get(fileCount - 1);
+ if (waitToWroteSize <= 0 || !file.reachesFileEndBy(waitToWroteSize)) {
+ lastFile = file;
+ } else if (file.reachesFileEndBy(waitToWroteSize)) {
+ // Reach file end , need to fill blank bytes in file end
+ file.fillEmptyBytesInFileEnd();
+ }
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ // Try to get a new file
+ if (lastFile == null && createIfNecessary) {
+ this.writeLock.lock();
+ try {
+ if (this.files.size() != fileCount) {
+ // That means already create a new file , just continue and try again
+ continue;
+ }
+ lastFile = this.allocateService.takeEmptyFile();
+ if (lastFile != null) {
+ final long newFileOffset = (long) this.files.size() * (long) this.fileSize;
+ lastFile.setFileFromOffset(newFileOffset);
+ this.files.add(lastFile);
+ this.swapOutFilesIfNecessary();
+ return lastFile;
+ } else {
+ continue;
+ }
+ } catch (final Exception e) {
+ LOG.error("Error on create new abstract file , current logIndex:{}", logIndex);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+ return lastFile;
+ }
+ }
+
+ public void swapOutFilesIfNecessary() {
+ this.readLock.lock();
+ try {
+ if (this.files.size() <= this.storeOptions.getKeepInMemoryFileCount()) {
+ return;
+ }
+ int filesInMemoryCount = this.allocateService.getAllocatedFileCount();
+ int swappedOutCount = 0;
+ final int lastIndex = this.files.size() - 1;
+ long lastSwappedOutPosition = 0;
+ for (int i = lastIndex; i >= 0; i--) {
+ final AbstractFile abstractFile = this.files.get(i);
+ if (abstractFile.isMapped()) {
+ filesInMemoryCount++;
+ if (filesInMemoryCount >= this.storeOptions.getKeepInMemoryFileCount() && i != lastIndex) {
+ abstractFile.unmmap();
+ swappedOutCount++;
+ if (lastSwappedOutPosition == 0) {
+ lastSwappedOutPosition = abstractFile.getFileFromOffset() + abstractFile.getFileSize();
+ }
+ }
+ }
+ }
+ // Because lastSwappedOutPosition means all the data had been flushed before lastSwappedOutPosition,
+ // so we should update flush position
+ if (getFlushedPosition() < lastSwappedOutPosition) {
+ setFlushedPosition(lastSwappedOutPosition);
+ }
+ LOG.info("Swapped out {} abstract files", swappedOutCount);
+ } catch (final Exception e) {
+ LOG.error("Error on swap out files", e);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * @return the file that contains this logIndex
+ */
+ public AbstractFile findFileByLogIndex(final long logIndex, final boolean returnFirstIfNotFound) {
+ this.readLock.lock();
+ try {
+ if (this.files.isEmpty()) {
+ return null;
+ }
+ if (this.files.size() == 1) {
+ return getFirstFile();
+ }
+ int lo = 0, hi = this.files.size() - 1;
+ while (lo <= hi) {
+ final int mid = (lo + hi) >>> 1;
+ final AbstractFile file = this.files.get(mid);
+ if (file.getLastLogIndex() < logIndex) {
+ lo = mid + 1;
+ } else if (file.getFirstLogIndex() > logIndex) {
+ hi = mid - 1;
+ } else {
+ return this.files.get(mid);
+ }
+ }
+ if (returnFirstIfNotFound) {
+ return getFirstFile();
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ return null;
+ }
+
+ /*
+ * @param logIndex begin index
+ * @return the files that contain of after this logIndex
+ */
+ public AbstractFile[] findFileFromLogIndex(final long logIndex) {
+ this.readLock.lock();
+ try {
+ for (int i = 0; i < this.files.size(); i++) {
+ AbstractFile file = this.files.get(i);
+ if (file.getFirstLogIndex() <= logIndex && logIndex <= file.getLastLogIndex()) {
+ final AbstractFile[] result = new AbstractFile[this.files.size() - i + 1];
+ for (int j = i; j < this.files.size(); j++) {
+ result[j - i] = this.files.get(j);
+ }
+ return result;
+ }
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ return new AbstractFile[0];
+ }
+
+ /**
+ * @return the file that contains this offset
+ */
+ public AbstractFile findFileByOffset(final long offset, final boolean returnFirstIfNotFound) {
+ this.readLock.lock();
+ try {
+ if (this.files.size() == 0)
+ { return null; }
+ final AbstractFile firstAbstractFile = getFirstFile();
+ final AbstractFile lastAbstractFile = getLastFile();
+ if (firstAbstractFile != null && lastAbstractFile != null) {
+ if (offset < firstAbstractFile.getFileFromOffset()
+ || offset >= lastAbstractFile.getFileFromOffset() + this.fileSize) {
+ LOG.warn(
+ "Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, fileSize: {}, fileNums: {}",
+ offset, firstAbstractFile.getFileFromOffset(), lastAbstractFile.getFileFromOffset()
+ + this.fileSize, this.fileSize,
+ this.files.size());
+ } else {
+ // Locate the index
+ final int index = (int) ((offset / this.fileSize) - (firstAbstractFile.getFileFromOffset() / this.fileSize));
+ AbstractFile targetFile;
+ targetFile = this.files.get(index);
+
+ if (targetFile != null && offset >= targetFile.getFileFromOffset()
+ && offset < targetFile.getFileFromOffset() + this.fileSize) {
+ return targetFile;
+ }
+
+ // If pre not found , then traverse to find target file
+ for (final AbstractFile abstractFile : this.files) {
+ if (offset >= abstractFile.getFileFromOffset()
+ && offset < abstractFile.getFileFromOffset() + this.fileSize) {
+ return abstractFile;
+ }
+ }
+ }
+ if (returnFirstIfNotFound) {
+ return firstAbstractFile;
+ }
+ }
+ } catch (final Exception e) {
+ LOG.error("Error on find abstractFile by offset :{}, file type:{}", offset, this.fileType.getFileName(), e);
+ } finally {
+ this.readLock.unlock();
+ }
+ return null;
+ }
+
+ /**
+ * Flush from flushPosition
+ * @return true if flush success
+ */
+ public boolean flush() {
+ final long flushWhere = getFlushedPosition();
+ final AbstractFile abstractFile = findFileByOffset(flushWhere, flushWhere == 0);
+ if (abstractFile != null) {
+ final int flushOffset = abstractFile.flush();
+ setFlushedPosition(abstractFile.getFileFromOffset() + flushOffset);
+ return getFlushedPosition() != flushWhere;
+ }
+ return false;
+ }
+
+ public AbstractFile getLastFile() {
+ return this.files.get(this.files.size() - 1);
+ }
+
+ public AbstractFile getFirstFile() {
+ if (!this.files.isEmpty()) {
+ return this.files.get(0);
+ }
+ return null;
+ }
+
+ /**
+ * Truncate files suffix by offset
+ * Only be called when recover
+ */
+ public void truncateSuffixByOffset(final long offset) {
+ this.readLock.lock();
+ final ArrayList<AbstractFile> willRemoveFiles = new ArrayList<>();
+ try {
+ for (final AbstractFile file : this.files) {
+ final long tailOffset = file.getFileFromOffset() + this.fileSize;
+ if (tailOffset > offset) {
+ if (offset >= file.getFileFromOffset()) {
+ final int truncatePosition = (int) (offset % this.fileSize);
+ file.setWrotePosition(truncatePosition);
+ file.setFlushPosition(truncatePosition);
+ } else {
+ willRemoveFiles.add(file);
+ }
+ }
+ }
+ } finally {
+ this.readLock.unlock();
+ for (final AbstractFile file : willRemoveFiles) {
+ if (file != null) {
+ file.shutdown(1000, true);
+ }
+ }
+ deleteFiles(willRemoveFiles);
+ }
+ }
+
+ /**
+ * If file's lastLogIndex < first_index_kept , it will be destroyed
+ * But if file's firstLogIndex < firstIndexKept < file's lastLogIndex, it will be retained
+ */
+ public boolean truncatePrefix(final long firstIndexKept) {
+ this.readLock.lock();
+ final List<AbstractFile> willRemoveFiles = new ArrayList<>();
+ try {
+ for (final AbstractFile abstractFile : this.files) {
+ final long lastLogIndex = abstractFile.getLastLogIndex();
+ if (lastLogIndex < firstIndexKept) {
+ willRemoveFiles.addAll(this.files);
+ }
+ }
+ return true;
+ } finally {
+ this.readLock.unlock();
+ for (final AbstractFile file : willRemoveFiles) {
+ if (file != null) {
+ file.shutdown(1000, true);
+ }
+ }
+ deleteFiles(willRemoveFiles);
+ }
+ }
+
+ /**
+ * Truncate files suffix by lastIndexKept , (last_index_kept, last_log_index]
+ * will be discarded.
+ */
+ public boolean truncateSuffix(final long lastIndexKept, final int pos) {
+ if (getLastLogIndex() <= lastIndexKept) {
+ return true;
+ }
+ this.readLock.lock();
+ final List<AbstractFile> willRemoveFiles = new ArrayList<>();
+ try {
+ long retainPosition = 0;
+ for (final AbstractFile abstractFile : this.files) {
+ final long firstLogIndex = abstractFile.getFirstLogIndex();
+ final long lastLogIndex = abstractFile.getLastLogIndex();
+ if (lastLogIndex > lastIndexKept) {
+ if (lastIndexKept >= firstLogIndex) {
+ // Truncate
+ final int lastPosition = abstractFile.truncate(lastIndexKept + 1, pos);
+ retainPosition += lastPosition;
+ } else {
+ // Remove
+ willRemoveFiles.add(abstractFile);
+ }
+ } else {
+ retainPosition += this.fileSize;
+ }
+ }
+ // Update flush position
+ if (retainPosition < getFlushedPosition())
+ { setFlushedPosition(retainPosition); }
+ return true;
+ } finally {
+ this.readLock.unlock();
+ for (final AbstractFile file : willRemoveFiles) {
+ if (file != null) {
+ file.shutdown(1000, true);
+ }
+ }
+ deleteFiles(willRemoveFiles);
+ }
+ }
+
+ /**
+ * Clear all files
+ */
+ public boolean reset(final long nextLogIndex) {
+ List<AbstractFile> destroyedFiles = new ArrayList<>();
+ this.writeLock.lock();
+ try {
+ destroyedFiles.addAll(this.files);
+ this.files.clear();
+ setFlushedPosition(0);
+ LOG.info("Destroyed all abstractFiles in path {} by resetting.", this.storePath);
+ } finally {
+ this.writeLock.unlock();
+ for (final AbstractFile file : destroyedFiles) {
+ file.shutdown(1000, true);
+ }
+ }
+ return true;
+ }
+
+ private void deleteFiles(final List<AbstractFile> files) {
+ this.writeLock.lock();
+ try {
+ if (!files.isEmpty()) {
+ files.removeIf(file -> !this.files.contains(file));
+ this.files.removeAll(files);
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public long getFirstLogIndex() {
+ this.readLock.lock();
+ try {
+ if (!this.files.isEmpty()) {
+ return this.files.get(0).getFirstLogIndex();
+ }
+ return -1;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public long getLastLogIndex() {
+ this.readLock.lock();
+ try {
+ if (!this.files.isEmpty()) {
+ final AbstractFile lastFile = getLastFile();
+ return lastFile.getLastLogIndex();
+ }
+ return -1;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void shutdown() {
+ for (final AbstractFile file : this.files) {
+ if (file != null) {
+ file.shutdown(5000, false);
+ }
+ }
+ }
+
+ public long getFlushedPosition() {
+ return flushedPosition;
+ }
+
+ public synchronized void setFlushedPosition(final long flushedPosition) {
+ this.flushedPosition = flushedPosition;
+ }
+
+ public long getFileSequenceFromFileName(final File file) {
+ final String name = file.getName();
+ if (name.endsWith(this.fileType.getFileSuffix())) {
+ int idx = name.indexOf(this.fileType.getFileSuffix());
+ return Long.parseLong(name.substring(0, idx));
+ }
+ return 0;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileType.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileType.java
new file mode 100644
index 0000000000..5d1fbc6946
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/FileType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file;
+
+/**
+ * Specifies the type of file
+ */
+public enum FileType {
+ FILE_INDEX("indexFile", ".i"), // index file
+ FILE_SEGMENT("segmentFile", ".s"), // segment file
+ FILE_CONFIGURATION("confFile", ".c"); // configuration file
+
+ private final String fileName;
+ private final String fileSuffix;
+
+ FileType(String fileName, String fileSuffix) {
+ this.fileName = fileName;
+ this.fileSuffix = fileSuffix;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getFileSuffix() {
+ return fileSuffix;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/AbortFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/AbortFile.java
new file mode 100644
index 0000000000..d3e00a98a9
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/AbortFile.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Abort file
+ */
+public class AbortFile {
+
+ private final String path;
+
+ public String getPath() {
+ return this.path;
+ }
+
+ public AbortFile(final String path) {
+ super();
+ this.path = path;
+ }
+
+ public boolean create() throws IOException {
+ final File file = new File(this.path);
+ if (file.createNewFile()) {
+ writeDate();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private void writeDate() throws IOException {
+ final File file = new File(this.path);
+ try (final FileWriter writer = new FileWriter(file, false)) {
+ writer.write(new Date().toGMTString());
+ writer.write(System.lineSeparator());
+ }
+ }
+
+ public void touch() throws IOException {
+ writeDate();
+ }
+
+ public boolean exists() {
+ final File file = new File(this.path);
+ return file.isFile() && file.exists();
+ }
+
+ public boolean destroy() {
+ return new File(this.path) //
+ .delete();
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/Checkpoint.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/Checkpoint.java
new file mode 100644
index 0000000000..bf89b3ffe8
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/Checkpoint.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import java.io.IOException;
+
+import java.nio.file.Path;
+
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.io.MessageFile;
+
+/**
+ * Abstract checkpoint file
+ */
+public abstract class Checkpoint {
+
+ private final String path;
+ private final RaftMessagesFactory raftMessagesFactory;
+
+ public Checkpoint(final String path, RaftOptions raftOptions) {
+ super();
+ this.path = path;
+ this.raftMessagesFactory = raftOptions.getRaftMessagesFactory();
+ }
+
+ /**
+ * Encode metadata
+ */
+ public abstract byte[] encode();
+
+ /**
+ * Decode file data
+ */
+ public abstract boolean decode(final byte[] bs);
+
+ public synchronized boolean save() throws IOException {
+ MessageFile file = new MessageFile(this.path);
+ final byte[] data = this.encode();
+ final LocalFileMeta meta = raftMessagesFactory.localFileMeta() //
+ .userMeta(data) //
+ .build();
+ return file.save(meta, true);
+ }
+
+ public void load() throws IOException {
+ MessageFile file = new MessageFile(this.path);
+ final LocalFileMeta meta = file.load();
+ if (meta != null) {
+ final byte[] data = meta.userMeta();
+ decode(data);
+ }
+ }
+
+ public void destroy() {
+ IgniteUtils.deleteIfExists(Path.of(this.path));
+ }
+
+ public String getPath() {
+ return this.path;
+ }
+
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FirstLogIndexCheckpoint.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FirstLogIndexCheckpoint.java
new file mode 100644
index 0000000000..cd9b532bc4
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FirstLogIndexCheckpoint.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.util.Bits;
+
+/**
+ * This checkpoint is used for save firstLogIndex
+ */
+public class FirstLogIndexCheckpoint extends Checkpoint {
+
+ // LogStorage first log index
+ public long firstLogIndex = -1;
+
+ public FirstLogIndexCheckpoint(final String path, RaftOptions raftOptions) {
+ super(path, raftOptions);
+ }
+
+ /**
+ * firstLogIndex (8 bytes)
+ */
+ public byte[] encode() {
+ byte[] bs = new byte[8];
+ Bits.putLong(bs, 0, this.firstLogIndex);
+ return bs;
+ }
+
+ public boolean decode(final byte[] bs) {
+ if (bs.length < 8) {
+ return false;
+ }
+ this.firstLogIndex = Bits.getLong(bs, 0);
+ return this.firstLogIndex >= 0;
+ }
+
+ public void setFirstLogIndex(final long firstLogIndex) {
+ this.firstLogIndex = firstLogIndex;
+ }
+
+ public void reset() {
+ this.firstLogIndex = -1;
+ }
+
+ public boolean isInit() {
+ return this.firstLogIndex >= 0;
+ }
+
+ @Override
+ public String toString() {
+ return "FirstLogIndexCheckpoint{" + "firstLogIndex=" + firstLogIndex + '}';
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FlushStatusCheckpoint.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FlushStatusCheckpoint.java
new file mode 100644
index 0000000000..0c1736be07
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/assit/FlushStatusCheckpoint.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.assit;
+
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.util.AsciiStringUtil;
+import org.apache.ignite.raft.jraft.util.Bits;
+
+/**
+ * This checkpoint is used for save flushPosition and last LogIndex
+ */
+public class FlushStatusCheckpoint extends Checkpoint {
+
+ // Current file name
+ public String fileName;
+ // Current flush position.
+ public long flushPosition;
+ // Last logIndex
+ public long lastLogIndex;
+
+ public FlushStatusCheckpoint(final String path, RaftOptions raftOptions) {
+ super(path, raftOptions);
+ }
+
+ /**
+ * flushPosition (8 bytes) + lastLogIndex (8 bytes) + path(4 bytes len + string bytes)
+ */
+ public byte[] encode() {
+ byte[] ps = AsciiStringUtil.unsafeEncode(this.fileName);
+ byte[] bs = new byte[20 + ps.length];
+ Bits.putLong(bs, 0, this.flushPosition);
+ Bits.putLong(bs, 8, this.lastLogIndex);
+ Bits.putInt(bs, 16, ps.length);
+ System.arraycopy(ps, 0, bs, 20, ps.length);
+ return bs;
+ }
+
+ public boolean decode(final byte[] bs) {
+ if (bs.length < 20) {
+ return false;
+ }
+ this.flushPosition = Bits.getLong(bs, 0);
+ this.lastLogIndex = Bits.getLong(bs, 8);
+ int len = Bits.getInt(bs, 16);
+ this.fileName = AsciiStringUtil.unsafeDecode(bs, 20, len);
+ return this.flushPosition >= 0 && this.lastLogIndex >= 0 && !this.fileName.isEmpty();
+ }
+
+ public void setFileName(final String fileName) {
+ this.fileName = fileName;
+ }
+
+ public void setFlushPosition(final long flushPosition) {
+ this.flushPosition = flushPosition;
+ }
+
+ public void setLastLogIndex(final long lastLogIndex) {
+ this.lastLogIndex = lastLogIndex;
+ }
+
+ @Override
+ public String toString() {
+ return "FlushStatusCheckpoint{" + "segFilename='" + fileName + '\'' + ", flushPosition=" + flushPosition
+ + ", lastLogIndex=" + lastLogIndex + '}';
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
new file mode 100644
index 0000000000..d2399afbfc
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexFile.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.index;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+
+/**
+ * * File header:
+ * * <pre>
+ * * magic bytes first log index file from offset reserved
+ * * [0x20 0x20] [... 8 bytes...] [... 8 bytes...] [... 8 bytes...]
+ * * </pre>
+ * *
+ * * Every record format is:
+ * * <pre>
+ * * Magic byte index type offset position
+ * * [0x57] [1 byte] [4 bytes] [4 bytes]
+ * *</pre>
+ * *
+ * The implementation of offset index
+ */
+public class IndexFile extends AbstractFile {
+ private static final IgniteLogger LOG = Loggers.forClass(IndexFile.class);
+
+ /**
+ * Magic bytes for data buffer.
+ */
+ public static final byte[] RECORD_MAGIC_BYTES = new byte[] { (byte) 0x57 };
+
+ public static final int RECORD_MAGIC_BYTES_SIZE = RECORD_MAGIC_BYTES.length;
+
+ public IndexFile(final String filePath, final int fileSize) {
+ super(filePath, fileSize, true);
+ }
+
+ /**
+ * The offset entry of Index, including (1 byte magic, 4 bytes offset, 4 bytes position, 1 byte logType)
+ */
+ public static class IndexEntry {
+ // Log index
+ private long logIndex;
+ // Relative offset
+ private int offset;
+ // Physical position
+ private int position;
+ // LogType
+ private byte logType;
+
+ // Index entry size
+ public static final int INDEX_SIZE = 10;
+
+ public IndexEntry(final long logIndex, final int position, final byte logType) {
+ this(logIndex, 0, position, logType);
+ }
+
+ public IndexEntry(final int offset, final int position) {
+ this(0, offset, position, IndexType.IndexSegment.getType());
+ }
+
+ public IndexEntry(final long logIndex, final int offset, final int position, final byte logType) {
+ this.logIndex = logIndex;
+ this.offset = offset;
+ this.position = position;
+ this.logType = logType;
+ }
+
+ public static IndexEntry newInstance() {
+ return new IndexEntry(-1, -1);
+ }
+
+ public void setLogIndex(final long logIndex) {
+ this.logIndex = logIndex;
+ }
+
+ public long getLogIndex() {
+ return logIndex;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ public byte getLogType() {
+ return logType;
+ }
+
+ public boolean decode(final ByteBuffer buffer) {
+ if (buffer == null || buffer.remaining() < INDEX_SIZE) {
+ LOG.error("Fail to decode index entry , invalid buffer length: {}",
+ buffer == null ? 0 : buffer.remaining());
+ return false;
+ }
+ final byte[] magic = new byte[1];
+ buffer.get(magic);
+ if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+ LOG.error("Fail to decode index entry, invalid buffer magic");
+ return false;
+ }
+ this.logType = buffer.get();
+ this.offset = buffer.getInt();
+ this.position = buffer.getInt();
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "IndexEntry{" + "logIndex=" + logIndex + ", offset=" + offset + ", position=" + position
+ + ", logType=" + logType + '}';
+ }
+ }
+
+ /**
+ * Write the index entry
+ * @param logIndex the log index
+ * @param position the physical position
+ * @param logType [1] means logEntry , [2] means confEntry
+ */
+ public int appendIndex(final long logIndex, final int position, final byte logType) {
+ this.writeLock.lock();
+ try {
+ assert (logIndex > getLastLogIndex());
+ final byte[] writeData = encodeData(toRelativeOffset(logIndex), position, logType);
+ return doAppend(logIndex, writeData);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private byte[] encodeData(final int offset, final int position, final byte logType) {
+ final ByteBuffer buffer = ByteBuffer.allocate(getIndexSize());
+ // Magics
+ buffer.put(RECORD_MAGIC_BYTES);
+ // logType (segmentLog or conf)
+ buffer.put(logType);
+ // offset from FirstLogIndex
+ buffer.putInt(offset);
+ // phyPosition in segmentFile
+ buffer.putInt(position);
+ buffer.flip();
+ return buffer.array();
+ }
+
+ /**
+ * Find the index entry
+ * @param logIndex the target log index
+ * @return a pair holding this offset and its physical file position.
+ */
+ public IndexEntry lookupIndex(final long logIndex) {
+ mapInIfNecessary();
+ this.readLock.lock();
+ try {
+ final ByteBuffer byteBuffer = sliceByteBuffer();
+ final int slot = (int) (logIndex - this.header.getFirstLogIndex());
+ if (slot < 0) {
+ return IndexEntry.newInstance();
+ } else {
+ return parseEntry(byteBuffer, slot);
+ }
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public CheckDataResult checkData(final ByteBuffer buffer) {
+ if (buffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
+ return CheckDataResult.CHECK_FAIL;
+ }
+ // Check magic
+ final byte[] magic = new byte[RECORD_MAGIC_BYTES_SIZE];
+ buffer.get(magic);
+ if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+ return CheckDataResult.FILE_END;
+ }
+ // Check index type
+ final byte indexType = buffer.get();
+ if (indexType != IndexType.IndexSegment.getType() && indexType != IndexType.IndexConf.getType()) {
+ return CheckDataResult.CHECK_FAIL;
+ }
+ if (buffer.remaining() < getIndexSize() - RECORD_MAGIC_BYTES_SIZE - 1) {
+ return CheckDataResult.CHECK_FAIL;
+ }
+ final CheckDataResult result = CheckDataResult.CHECK_SUCCESS;
+ result.setSize(getIndexSize());
+ return result;
+ }
+
+ @Override
+ public void onRecoverDone(final int recoverPosition) {
+ final int indexNum = (recoverPosition - this.header.getHeaderSize()) / IndexEntry.INDEX_SIZE;
+ this.header.setLastLogIndex(this.header.getFirstLogIndex() + indexNum);
+ }
+
+ @Override
+ public int truncate(final long logIndex, final int pos) {
+ this.writeLock.lock();
+ try {
+ if (logIndex < this.header.getFirstLogIndex() || logIndex > this.header.getLastLogIndex()) {
+ return 0;
+ }
+ final int slot = (int) (logIndex - this.header.getFirstLogIndex());
+ return truncateToSlot(slot);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Truncate to a known number of slot.
+ */
+ public int truncateToSlot(final int slot) {
+ this.writeLock.lock();
+ try {
+ this.header.setLastLogIndex(this.header.getFirstLogIndex() + slot - 1);
+ final int lastPos = this.header.getHeaderSize() + slot * getIndexSize();
+ updateAllPosition(lastPos);
+ clear(getWrotePosition());
+ return lastPos;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ /**
+ * Parse an index entry from this index file
+ */
+ private IndexEntry parseEntry(final ByteBuffer buffer, final int n) {
+ final int pos = this.header.getHeaderSize() + n * getIndexSize();
+ buffer.position(pos);
+ final IndexEntry indexEntry = IndexEntry.newInstance();
+ indexEntry.decode(buffer);
+ return indexEntry;
+ }
+
+ /**
+ * Return the relative offset
+ */
+ private int toRelativeOffset(final long offset) {
+ if (this.header.isBlank()) {
+ return 0;
+ } else {
+ return (int) (offset - this.header.getFirstLogIndex());
+ }
+ }
+
+ /**
+ * @return the IndexEntry's phyPosition in this IndexFile
+ */
+ public int calculateIndexPos(final long logIndex) {
+ return (int) (this.header.getHeaderSize() + (logIndex - getFirstLogIndex() + 1) * getIndexSize());
+ }
+
+ public int getIndexSize() {
+ return IndexEntry.INDEX_SIZE;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexType.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexType.java
new file mode 100644
index 0000000000..96fd2038be
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/index/IndexType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.index;
+
+public enum IndexType {
+
+ IndexSegment((byte) 1), // Segment log' s index
+ IndexConf((byte) 2); // Conf log's index
+
+ private final byte indexType;
+
+ IndexType(final byte indexType) {
+ this.indexType = indexType;
+ }
+
+ public byte getType() {
+ return indexType;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
new file mode 100644
index 0000000000..ebc675cc1a
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/file/segment/SegmentFile.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.file.segment;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+
+/**
+ * * File header:
+ * * <pre>
+ * * magic bytes first log index file from offset reserved
+ * * [0x20 0x20] [... 8 bytes...] [... 8 bytes...] [... 8 bytes...]
+ * * <pre>
+ *
+ * * Every record format is:
+ * * <pre>
+ * Magic bytes data length data
+ * [0x57, 0x8A] [4 bytes] [bytes]
+ * *</pre>
+ * *
+ */
+public class SegmentFile extends AbstractFile {
+ private static final IgniteLogger LOG = Loggers.forClass(SegmentFile.class);
+
+ /**
+ * Magic bytes for data buffer.
+ */
+ public static final byte[] RECORD_MAGIC_BYTES = new byte[] { (byte) 0x57, (byte) 0x8A };
+
+ public static final int RECORD_MAGIC_BYTES_SIZE = RECORD_MAGIC_BYTES.length;
+
+ // 4 Bytes for written data length
+ private static final int RECORD_DATA_LENGTH_SIZE = 4;
+
+ public SegmentFile(final String filePath, final int fileSize) {
+ super(filePath, fileSize, true);
+ }
+
+ /**
+ *
+ * Write the data and return it's wrote position.
+ * @param logIndex the log index
+ * @param data data to write
+ * @return the wrote position
+ */
+ public int appendData(final long logIndex, final byte[] data) {
+ this.writeLock.lock();
+ try {
+ assert (logIndex > getLastLogIndex());
+ final byte[] writeData = encodeData(data);
+ return doAppend(logIndex, writeData);
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private byte[] encodeData(final byte[] data) {
+ ByteBuffer buffer = ByteBuffer.allocate(getWriteBytes(data));
+ buffer.put(RECORD_MAGIC_BYTES);
+ buffer.putInt(data.length);
+ buffer.put(data);
+ buffer.flip();
+ return buffer.array();
+ }
+
+ /**
+ * Read data from the position.
+ *
+ * @param logIndex the log index
+ * @param pos the position to read
+ * @return read data
+ */
+ public byte[] lookupData(final long logIndex, final int pos) {
+ assert (pos >= this.header.getHeaderSize());
+ mapInIfNecessary();
+ this.readLock.lock();
+ try {
+ if (logIndex < this.header.getFirstLogIndex() || logIndex > this.getLastLogIndex()) {
+ LOG.warn(
+ "Try to read data from segment file {} out of range, logIndex={}, readPos={}, firstLogIndex={}, lastLogIndex={}.",
+ getFilePath(), logIndex, pos, this.header.getFirstLogIndex(), getLastLogIndex());
+ return null;
+ }
+ if (pos > getFlushedPosition()) {
+ LOG.warn(
+ "Try to read data from segment file {} out of comitted position, logIndex={}, readPos={}, wrotePos={}, flushPos={}.",
+ getFilePath(), logIndex, pos, getWrotePosition(), getFlushedPosition());
+ return null;
+ }
+ return lookupData(pos);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ /**
+ * Read data from the position
+ * @param pos the position to read
+ * @return read data
+ */
+ public byte[] lookupData(final int pos) {
+ assert (pos >= this.header.getHeaderSize());
+ mapInIfNecessary();
+ this.readLock.lock();
+ try {
+ final ByteBuffer readBuffer = sliceByteBuffer();
+ readBuffer.position(pos);
+ if (readBuffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
+ return null;
+ }
+ final byte[] magic = new byte[RECORD_MAGIC_BYTES_SIZE];
+ readBuffer.get(magic);
+ if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+ return null;
+ }
+ final int dataLen = readBuffer.getInt();
+ if (dataLen <= 0) {
+ return null;
+ }
+ final byte[] data = new byte[dataLen];
+ readBuffer.get(data);
+ return data;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public CheckDataResult checkData(final ByteBuffer buffer) {
+ if (buffer.remaining() < RECORD_MAGIC_BYTES_SIZE) {
+ return CheckDataResult.CHECK_FAIL;
+ }
+ // Check magic
+ final byte[] magic = new byte[RECORD_MAGIC_BYTES_SIZE];
+ buffer.get(magic);
+ if (!Arrays.equals(magic, RECORD_MAGIC_BYTES)) {
+ return CheckDataResult.FILE_END;
+ }
+ // Check len
+ if (buffer.remaining() < RECORD_DATA_LENGTH_SIZE) {
+ return CheckDataResult.CHECK_FAIL;
+ }
+ final int dataLen = buffer.getInt();
+ if (buffer.remaining() < dataLen) {
+ return CheckDataResult.CHECK_FAIL;
+ }
+ final CheckDataResult result = CheckDataResult.CHECK_SUCCESS;
+ result.setSize(RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + dataLen);
+ return result;
+ }
+
+ @Override
+ public void onRecoverDone(final int recoverPosition) {
+ // Since the logs index in the segmentFile are discontinuous, we should set LastLogIndex by reading and deSerializing last entry log
+ final ByteBuffer buffer = sliceByteBuffer();
+ buffer.position(recoverPosition);
+ final byte[] data = lookupData(recoverPosition);
+ if (data != null) {
+ final LogEntry lastEntry = LogEntryV1CodecFactory.getInstance().decoder().decode(data);
+ if (lastEntry != null) {
+ setLastLogIndex(lastEntry.getId().getIndex());
+ }
+ }
+ }
+
+ @Override
+ public int truncate(final long logIndex, final int pos) {
+ this.writeLock.lock();
+ try {
+ if (logIndex < this.header.getFirstLogIndex() || logIndex > this.header.getLastLogIndex()) {
+ return 0;
+ }
+ if (pos < 0) {
+ return getWrotePosition();
+ }
+ updateAllPosition(pos);
+ clear(getWrotePosition());
+ this.header.setLastLogIndex(logIndex - 1);
+ return pos;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ public static int getWriteBytes(final byte[] data) {
+ return RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + data.length;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/AllocateFileService.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/AllocateFileService.java
new file mode 100644
index 0000000000..7bd491ac84
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/AllocateFileService.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.service;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.util.concurrent.ShutdownAbleThread;
+import org.apache.ignite.raft.jraft.util.ArrayDeque;
+import org.apache.ignite.raft.jraft.util.OnlyForTest;
+
+/**
+ * Pre allocate abstractFile service
+ */
+public class AllocateFileService extends ShutdownAbleThread {
+ private final FileType fileType;
+ private final String storePath;
+ private final StoreOptions storeOptions;
+ private final LogStoreFactory logStoreFactory;
+ // Pre-allocated files
+ private final ArrayDeque<AllocatedResult> blankFiles = new ArrayDeque<>();
+ // Abstract file sequence.
+ private final AtomicLong nextFileSequence = new AtomicLong(0);
+ private final Lock allocateLock = new ReentrantLock();
+ private final Condition fullCond = this.allocateLock.newCondition();
+ private final Condition emptyCond = this.allocateLock.newCondition();
+
+ public AllocateFileService(final AbstractDB abstractDB, final LogStoreFactory logStoreFactory) {
+ this.fileType = abstractDB.getDBFileType();
+ this.storePath = abstractDB.getStorePath();
+ this.storeOptions = logStoreFactory.getStoreOptions();
+ this.logStoreFactory = logStoreFactory;
+ }
+
+ @OnlyForTest
+ public AllocateFileService(final FileType fileType, final String storePath, final LogStoreFactory logStoreFactory) {
+ this.fileType = fileType;
+ this.storePath = storePath;
+ this.logStoreFactory = logStoreFactory;
+ this.storeOptions = logStoreFactory.getStoreOptions();
+ }
+
+ public static class AllocatedResult {
+ AbstractFile abstractFile;
+
+ public AllocatedResult(final AbstractFile abstractFile) {
+ super();
+ this.abstractFile = abstractFile;
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!isStopped()) {
+ doAllocateFileInLock();
+ }
+ } catch (final InterruptedException ignored) {
+ }
+ onShutdown();
+ }
+
+ @Override
+ public void onShutdown() {
+ // Destroy all empty file
+ for (final AllocatedResult result : this.blankFiles) {
+ if (result.abstractFile != null) {
+ result.abstractFile.shutdown(5000, false);
+ }
+ }
+ }
+
+ private AbstractFile allocateNewAbstractFile() {
+ final String newFilePath = getNewFilePath();
+ final AbstractFile file = this.logStoreFactory.newFile(this.fileType, newFilePath);
+ if (this.storeOptions.isEnableWarmUpFile()) {
+ file.warmupFile();
+ }
+ return file;
+ }
+
+ private void doAllocateFile0() {
+ final AbstractFile abstractFile = allocateNewAbstractFile();
+ this.blankFiles.add(new AllocatedResult(abstractFile));
+ }
+
+ private void doAllocateFileInLock() throws InterruptedException {
+ this.allocateLock.lock();
+ try {
+ while (this.blankFiles.size() >= this.storeOptions.getPreAllocateFileCount()) {
+ this.fullCond.await();
+ }
+ doAllocateFile0();
+ this.emptyCond.signal();
+ } finally {
+ this.allocateLock.unlock();
+ }
+ }
+
+ public AbstractFile takeEmptyFile() throws Exception {
+ this.allocateLock.lock();
+ try {
+ while (this.blankFiles.isEmpty()) {
+ this.emptyCond.await();
+ }
+ final AllocatedResult result = this.blankFiles.pollFirst();
+ this.fullCond.signal();
+ return result.abstractFile;
+ } finally {
+ this.allocateLock.unlock();
+ }
+ }
+
+ public int getAllocatedFileCount() {
+ return this.blankFiles.size();
+ }
+
+ private String getNewFilePath() {
+ return this.storePath + File.separator + String.format("%019d", this.nextFileSequence.getAndIncrement())
+ + this.fileType.getFileSuffix();
+ }
+
+ public void setNextFileSequence(final long sequence) {
+ this.nextFileSequence.set(sequence);
+ }
+
+ public void addBlankAbstractFiles(final List<AbstractFile> blankFiles) {
+ for (final AbstractFile blankFile : blankFiles) {
+ this.blankFiles.add(new AllocatedResult(blankFile));
+ }
+ }
+
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/ServiceManager.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/ServiceManager.java
new file mode 100644
index 0000000000..412992edb0
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/storage/service/ServiceManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.storage.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.Lifecycle;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.util.concurrent.ShutdownAbleThread;
+
+/**
+ * Manager of service like allocateService
+ */
+public class ServiceManager implements Lifecycle<LogStoreFactory> {
+ private static final IgniteLogger LOG = Loggers.forClass(ServiceManager.class);
+
+ private final AbstractDB abstractDB;
+ private AllocateFileService allocateService;
+ // Maybe we will add more services in the future
+ private List<ShutdownAbleThread> serviceList;
+ private final AtomicBoolean start = new AtomicBoolean(false);
+
+ public ServiceManager(final AbstractDB abstractDB) {
+ this.abstractDB = abstractDB;
+ }
+
+ @Override
+ public boolean init(final LogStoreFactory logStoreFactory) {
+ this.allocateService = logStoreFactory.newAllocateService(this.abstractDB);
+ this.serviceList = new ArrayList<>(1);
+ this.serviceList.add(allocateService);
+ return true;
+ }
+
+ public void start() {
+ if (!this.start.compareAndSet(false, true)) {
+ return;
+ }
+ for (final ShutdownAbleThread serviceThread : this.serviceList) {
+ serviceThread.start();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (!this.start.compareAndSet(true, false)) {
+ return;
+ }
+ try {
+ this.allocateService.shutdown(true);
+ } catch (final Exception e) {
+ LOG.error("Error on shutdown {}'s serviceManager,", this.abstractDB.getDBName(), e);
+ }
+ }
+
+ public AllocateFileService getAllocateService() {
+ return this.allocateService;
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/Pair.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/Pair.java
new file mode 100644
index 0000000000..28d85b1ddd
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/Pair.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.util;
+
+import java.io.Serializable;
+
+public class Pair<K, V> implements Serializable {
+ private final K key;
+ private final V value;
+
+ public Pair(final K key, final V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static <K, V> Pair<K, V> of(final K key, final V value) {
+ return new Pair<K, V>(key, value);
+ }
+
+ public K getFirst() {
+ return key;
+ }
+
+ public V getSecond() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "Pair{" + "key=" + key + ", value=" + value + '}';
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ReferenceResource.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ReferenceResource.java
new file mode 100644
index 0000000000..39595adca6
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ReferenceResource.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.util.concurrent;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Reference counting base class, similar to the C++ smart pointer implementation
+ * Forked from <a href="https://github.com/apache/rocketmq">rocketmq</a>.
+ */
+public abstract class ReferenceResource {
+ protected final AtomicLong refCount = new AtomicLong(1);
+ protected volatile boolean available = true;
+ protected volatile boolean cleanupOver = false;
+ private volatile long firstShutdownTimestamp = 0;
+
+ /**
+ * Whether the resource can hold
+ */
+ public synchronized boolean hold() {
+ if (this.isAvailable()) {
+ if (this.refCount.getAndIncrement() > 0) {
+ return true;
+ } else {
+ this.refCount.getAndDecrement();
+ }
+ }
+ return false;
+ }
+
+ public boolean isAvailable() {
+ return this.available;
+ }
+
+ public void shutdown(final long intervalForcibly) {
+ if (this.available) {
+ this.available = false;
+ this.firstShutdownTimestamp = System.currentTimeMillis();
+ this.release();
+ } else if (this.getRefCount() > 0) {
+ if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
+ this.refCount.set(-1000 - this.getRefCount());
+ this.release();
+ }
+ }
+ }
+
+ /**
+ * Release resource
+ */
+ public void release() {
+ long value = this.refCount.decrementAndGet();
+ if (value > 0) {
+ return;
+ }
+ synchronized (this) {
+ this.cleanupOver = this.cleanup(value);
+ }
+ }
+
+ public long getRefCount() {
+ return this.refCount.get();
+ }
+
+ public boolean isCleanupOver() {
+ return this.refCount.get() <= 0 && this.cleanupOver;
+ }
+
+ public abstract boolean cleanup(final long currentRef);
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ShutdownAbleThread.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ShutdownAbleThread.java
new file mode 100644
index 0000000000..ac04aaebfc
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/logit/util/concurrent/ShutdownAbleThread.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.util.concurrent;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+public abstract class ShutdownAbleThread implements Runnable {
+ private static final IgniteLogger LOG = Loggers.forClass(ShutdownAbleThread.class);
+
+ private static final long JOIN_TIME = 90 * 1000;
+
+ private Thread thread;
+ protected volatile boolean stopped = false;
+ private final AtomicBoolean started = new AtomicBoolean(false);
+
+ public ShutdownAbleThread() {
+ }
+
+ public abstract void onShutdown();
+
+ public String getServiceName() {
+ return getClass().getSimpleName();
+ }
+
+ public void start() {
+ LOG.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), this.started.get(),
+ this.thread);
+ if (!this.started.compareAndSet(false, true)) {
+ return;
+ }
+ this.stopped = false;
+ this.thread = new Thread(this, getServiceName());
+ this.thread.start();
+ }
+
+ public void shutdown() {
+ this.shutdown(false);
+ }
+
+ public void shutdown(final boolean interrupt) {
+ LOG.info("Try to shutdown service thread:{} started:{} lastThread:{}", getServiceName(), this.started.get(),
+ this.thread);
+ if (!this.started.compareAndSet(true, false)) {
+ return;
+ }
+ this.stopped = true;
+ try {
+ if (interrupt) {
+ this.thread.interrupt();
+ }
+ this.thread.join(JOIN_TIME);
+ } catch (final InterruptedException e) {
+ LOG.error("Error when shutdown thread, serviceName:{}", getServiceName(), e);
+ }
+ }
+
+ protected void waitForRunning(long interval) throws InterruptedException {
+ Thread.sleep(interval);
+ }
+
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java
index 42884f6b59..cf9bdfe8f9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.entity.LocalFileMetaBuilder;
+import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter.FileSource;
import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
import org.apache.ignite.raft.jraft.error.RaftError;
@@ -108,7 +109,9 @@ public class LocalSnapshotWriter extends SnapshotWriter {
public boolean addFile(final String fileName, final Message fileMeta) {
final LocalFileMetaBuilder metaBuilder = msgFactory.localFileMeta();
if (fileMeta != null) {
- metaBuilder.source(((LocalFileMeta)fileMeta).source());
+ FileSource source = ((LocalFileMeta)fileMeta).source();
+
+ metaBuilder.sourceNumber(source == null ? 0 : source.getNumber());
metaBuilder.checksum(((LocalFileMeta)fileMeta).checksum());
}
final LocalFileMeta meta = metaBuilder.build();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java
index 2316bf5a34..01e74095f2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/util/Platform.java
@@ -31,6 +31,8 @@ public class Platform {
private static final boolean IS_MAC = isMac0();
+ private static final boolean IS_LINUX = isLinux0();
+
/**
* Return {@code true} if the JVM is running on Windows
*/
@@ -45,6 +47,13 @@ public class Platform {
return IS_MAC;
}
+ /**
+ * Return {@code true} if the JVM is running on Linux OSX
+ */
+ public static boolean isLinux() {
+ return IS_LINUX;
+ }
+
private static boolean isMac0() {
final boolean mac = SystemPropertyUtil.get("os.name", "") //
.toLowerCase(Locale.US) //
@@ -64,4 +73,14 @@ public class Platform {
}
return windows;
}
+
+ private static boolean isLinux0() {
+ final boolean linux = SystemPropertyUtil.get("os.name", "") //
+ .toLowerCase(Locale.US) //
+ .contains("linux");
+ if (linux) {
+ LOG.debug("Platform: Linux");
+ }
+ return linux;
+ }
}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java
index 6b0d4d6907..85bd719d13 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/BaseLogStorageTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.ignite.raft.jraft.storage.impl;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -36,12 +42,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
public abstract class BaseLogStorageTest extends BaseStorageTest {
protected LogStorage logStorage;
private ConfigurationManager confManager;
@@ -90,15 +90,15 @@ public abstract class BaseLogStorageTest extends BaseStorageTest {
assertEquals(entry1, this.logStorage.getEntry(100));
assertEquals(1, this.logStorage.getTerm(100));
- final LogEntry entry2 = TestUtils.mockEntry(200, 2);
+ final LogEntry entry2 = TestUtils.mockEntry(101, 2);
assertTrue(this.logStorage.appendEntry(entry2));
assertEquals(100, this.logStorage.getFirstLogIndex());
- assertEquals(200, this.logStorage.getLastLogIndex());
+ assertEquals(101, this.logStorage.getLastLogIndex());
assertEquals(entry1, this.logStorage.getEntry(100));
- assertEquals(entry2, this.logStorage.getEntry(200));
+ assertEquals(entry2, this.logStorage.getEntry(101));
assertEquals(1, this.logStorage.getTerm(100));
- assertEquals(2, this.logStorage.getTerm(200));
+ assertEquals(2, this.logStorage.getTerm(101));
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java
index 06064fc527..405426d896 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/io/MessageFileTest.java
@@ -16,16 +16,16 @@
*/
package org.apache.ignite.raft.jraft.storage.io;
-import java.io.File;
-import org.apache.ignite.raft.jraft.RaftMessagesFactory;
-import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter;
-import org.junit.jupiter.api.Test;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.entity.LocalFileMetaOutter;
+import org.junit.jupiter.api.Test;
+
public class MessageFileTest {
@Test
public void testSaveLoad() throws Exception {
@@ -37,7 +37,7 @@ public class MessageFileTest {
LocalFileMetaOutter.LocalFileMeta msg = new RaftMessagesFactory()
.localFileMeta()
.checksum("test")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_REFERENCE)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_REFERENCE.getNumber())
.build();
assertTrue(file.save(msg, true));
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/BaseLogitStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/BaseLogitStorageTest.java
new file mode 100644
index 0000000000..53a1ebb3c3
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/BaseLogitStorageTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import static org.apache.ignite.raft.jraft.test.TestUtils.mockEntry;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.BaseStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.factory.LogStoreFactory;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+
+public class BaseLogitStorageTest extends BaseStorageTest {
+ protected StoreOptions storeOptions = new StoreOptions();
+ protected int indexEntrySize;
+ protected int headerSize;
+ protected int indexFileSize;
+ protected int segmentFileSize;
+ protected ConfigurationManager confManager;
+ protected LogEntryCodecFactory logEntryCodecFactory;
+
+ protected LogStoreFactory logStoreFactory;
+
+ protected final byte segmentIndex = IndexType.IndexSegment.getType();
+
+ public void setup() throws Exception {
+ indexEntrySize = IndexEntry.INDEX_SIZE;
+ headerSize = FileHeader.HEADER_SIZE;
+ indexFileSize = headerSize + 10 * indexEntrySize;
+ this.segmentFileSize = 300;
+
+ storeOptions.setIndexFileSize(indexFileSize);
+ storeOptions.setSegmentFileSize(segmentFileSize);
+ storeOptions.setConfFileSize(segmentFileSize);
+ this.logStoreFactory = new LogStoreFactory(storeOptions, new RaftOptions());
+
+ this.confManager = new ConfigurationManager();
+ this.logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
+ }
+
+ protected byte[] genData(final int index, final int term, int size) {
+ final LogEntry entry = mockEntry(index, term, size - 16);
+ final byte[] data = LogEntryV1CodecFactory.getInstance().encoder().encode(entry);
+ assertEquals(size, data.length);
+ return data;
+ }
+
+ protected LogStorageOptions newLogStorageOptions() {
+ final LogStorageOptions opts = new LogStorageOptions();
+ opts.setConfigurationManager(this.confManager);
+ opts.setLogEntryCodecFactory(this.logEntryCodecFactory);
+ return opts;
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
new file mode 100644
index 0000000000..2fa2fe0590
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/LogitLogStorageTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit;
+
+import static org.apache.ignite.raft.jraft.entity.PeerId.emptyPeer;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.BaseLogStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.option.StoreOptions;
+import org.apache.ignite.raft.jraft.storage.logit.storage.LogitLogStorage;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.IndexDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.apache.ignite.raft.jraft.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class LogitLogStorageTest extends BaseLogStorageTest {
+ private LogitLogStorageFactory logStorageFactory;
+
+ @BeforeEach
+ @Override
+ public void setup() throws Exception {
+ logStorageFactory = new LogitLogStorageFactory(testStoreOptions());
+ logStorageFactory.start();
+
+ super.setup();
+ }
+
+ private static StoreOptions testStoreOptions() {
+ StoreOptions storeOptions = new StoreOptions();
+
+ storeOptions.setSegmentFileSize(512 * 1024);
+ storeOptions.setConfFileSize(512 * 1024);
+ storeOptions.setEnableWarmUpFile(false);
+
+ return storeOptions;
+ }
+
+ @AfterEach
+ @Override
+ public void teardown() {
+ logStorageFactory.close();
+
+ super.teardown();
+ }
+
+ @Override
+ protected LogStorage newLogStorage() {
+ return logStorageFactory.createLogStorage(this.path.toString(), new RaftOptions());
+ }
+
+ /************************ Test consistency between dbs ***********************************/
+
+ @Test
+ public void testAlignLogWhenLostIndex() {
+ final List<LogEntry> entries = TestUtils.mockEntries(20);
+ // Set 13 - 16 to be conf entry
+ for (int i = 13; i <= 16; i++) {
+ final LogEntry entry = entries.get(i);
+ entry.setType(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
+ entry.setPeers(List.of(emptyPeer()));
+ }
+ this.logStorage.appendEntries(entries);
+ assertEquals(19, this.logStorage.getLastLogIndex());
+
+ // Truncate index db to the index 12, when logStorage reStart, the missing index will be recovered from log db;
+ final IndexDB indexDB = ((LogitLogStorage) this.logStorage).getIndexDB();
+ indexDB.truncateSuffix(12, 0);
+ this.logStorage.shutdown();
+ this.logStorage.init(newLogStorageOptions());
+
+ assertEquals(19, this.logStorage.getLastLogIndex());
+ for (int i = 0; i <= 19; i++) {
+ final LogEntry entry = this.logStorage.getEntry(i);
+ assertEquals(i, entry.getId().getIndex());
+ }
+ }
+
+ @Test
+ public void testAlignLogWhenMoreIndex() {
+ final List<LogEntry> entries = TestUtils.mockEntries(15);
+ this.logStorage.appendEntries(entries);
+ // Append more index into indexDB
+ final IndexDB indexDB = ((LogitLogStorage) this.logStorage).getIndexDB();
+ long maxFlushPosition = 0;
+ for (int i = 15; i <= 20; i++) {
+ final Pair<Integer, Long> flushPair = indexDB.appendIndexAsync(i, 0, IndexType.IndexSegment);
+ maxFlushPosition = Math.max(maxFlushPosition, flushPair.getSecond());
+ }
+ indexDB.waitForFlush(maxFlushPosition, 100);
+ // Recover
+ this.logStorage.shutdown();
+ this.logStorage.init(newLogStorageOptions());
+
+ // In this case, logitLogStorage will truncate indexdb to the index of 14
+ final IndexDB indexDB1 = ((LogitLogStorage) this.logStorage).getIndexDB();
+ assertEquals(14, indexDB1.getLastLogIndex());
+
+ for (int i = 0; i <= 14; i++) {
+ final LogEntry entry = this.logStorage.getEntry(i);
+ assertEquals(i, entry.getId().getIndex());
+ }
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
new file mode 100644
index 0000000000..f58fa63a79
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/ConfDBTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.db;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.raft.jraft.JRaftUtils;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.ConfDB;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class ConfDBTest extends BaseLogitStorageTest {
+ private ConfDB confDB;
+ private String confStorePath;
+ private LogEntryCodecFactory logEntryCodecFactory;
+ private LogEntryDecoder decoder;
+ private LogEntryEncoder encoder;
+
+ @BeforeEach
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ this.confStorePath = this.path + File.separator + "conf";
+ Files.createDirectories(Path.of(confStorePath));
+ this.logEntryCodecFactory = LogEntryV1CodecFactory.getInstance();
+ decoder = this.logEntryCodecFactory.decoder();
+ encoder = this.logEntryCodecFactory.encoder();
+ this.init();
+ }
+
+ public void init() {
+ this.confDB = new ConfDB(this.confStorePath);
+ this.confDB.init(this.logStoreFactory);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ this.confDB.shutdown();
+ }
+
+ @Test
+ public void testAppendConfAndIter() throws Exception {
+ this.confDB.startServiceManager();
+ final LogEntry confEntry1 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
+ confEntry1.setId(new LogId(1, 1));
+ final List<PeerId> conf1Peers = JRaftUtils.getConfiguration("localhost:8081,localhost:8082").listPeers();
+ confEntry1.setPeers(conf1Peers);
+
+ final LogEntry confEntry2 = new LogEntry(EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION);
+ confEntry2.setId(new LogId(2, 2));
+ final List<PeerId> conf2Peers = JRaftUtils.getConfiguration("localhost:8081,localhost:8082,localhost:8083")
+ .listPeers();
+ confEntry2.setPeers(conf2Peers);
+ {
+
+ this.confDB.appendLogAsync(1, this.encoder.encode(confEntry1));
+ final Pair<Integer, Long> posPair = this.confDB.appendLogAsync(2, this.encoder.encode(confEntry2));
+ this.confDB.waitForFlush(posPair.getSecond(), 100);
+ }
+ {
+ final AbstractDB.LogEntryIterator iterator = this.confDB.iterator(this.decoder);
+ final LogEntry conf1 = iterator.next();
+ assertEquals(toString(conf1.getPeers()), toString(conf1Peers));
+
+ final LogEntry conf2 = iterator.next();
+ assertEquals(toString(conf2.getPeers()), toString(conf2Peers));
+
+ assertNull(iterator.next());
+ }
+ }
+
+ public String toString(final List<PeerId> peers) {
+ final StringBuilder sb = new StringBuilder();
+ int i = 0;
+ int size = peers.size();
+ for (final PeerId peer : peers) {
+ sb.append(peer);
+ if (i < size - 1) {
+ sb.append(",");
+ }
+ i++;
+ }
+ return sb.toString();
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
new file mode 100644
index 0000000000..9e5930b896
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/IndexDBTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.db;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.IndexDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.assit.AbortFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexType;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class IndexDBTest extends BaseLogitStorageTest {
+ private IndexDB indexDB;
+ private String indexStorePath;
+ private AbortFile abortFile;
+
+ @BeforeEach
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ this.indexStorePath = this.path + File.separator + "index";
+ this.abortFile = new AbortFile(this.indexStorePath + File.separator + "Abort");
+ Files.createDirectories(Path.of(indexStorePath));
+ this.init();
+ }
+
+ public void init() {
+ this.indexDB = new IndexDB(this.indexStorePath);
+ this.indexDB.init(this.logStoreFactory);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ this.indexDB.shutdown();
+ }
+
+ /**
+ * When call testAppendIndex
+ * The FileManager's file state is :
+ *
+ * fileId fileFromOffset firstLogIndex lastLogIndex fileLastOffset wrotePosition
+ * 0 0 0 9 26 + 100 = 126 26 + 100
+ * 1 26 + 100 10 15 26 + 26 + 160 = 212 26 + 60
+ */
+
+ /**
+ * Test for Service , which enables auto flush
+ */
+ @Test
+ public void testAppendIndex() throws Exception {
+ this.indexDB.startServiceManager();
+ {
+ // Append 10 index to first file , and come to the file end (size:130)
+ for (int i = 0; i < 10; i++) {
+ this.indexDB.appendIndexAsync(i, i, IndexType.IndexSegment);
+ }
+ // Write 5 index to second file , wrotePosition = 30 + 50
+ Pair<Integer, Long> posPair = null;
+ for (int i = 10; i <= 15; i++) {
+ posPair = this.indexDB.appendIndexAsync(i, i, IndexType.IndexSegment);
+ }
+
+ this.indexDB.waitForFlush(posPair.getSecond(), 100);
+
+ assertEquals(5, this.indexDB.lookupIndex(15).getOffset());
+ assertEquals(212, this.indexDB.getFlushedPosition());
+ }
+ }
+
+ @Test
+ public void testLookupFirstLogPosFromLogIndex() {
+ this.indexDB.startServiceManager();
+ {
+ this.indexDB.appendIndexAsync(1, 1, IndexType.IndexSegment);
+ this.indexDB.appendIndexAsync(2, 2, IndexType.IndexSegment);
+ final Pair<Integer, Long> posPair = this.indexDB.appendIndexAsync(3, 3, IndexType.IndexConf);
+ this.indexDB.waitForFlush(posPair.getSecond(), 100);
+ }
+
+ final Pair<Integer, Integer> posPair = this.indexDB.lookupFirstLogPosFromLogIndex(1);
+ final int firstSegmentPos = posPair.getFirst();
+ final int firstConfPos = posPair.getSecond();
+ assertEquals(1, firstSegmentPos);
+ assertEquals(3, firstConfPos);
+ }
+
+ @Test
+ public void testLookupLastLogIndexAndPosFromTail() {
+ this.indexDB.startServiceManager();
+ {
+ this.indexDB.appendIndexAsync(1, 1, IndexType.IndexSegment);
+ this.indexDB.appendIndexAsync(2, 2, IndexType.IndexSegment);
+ final Pair<Integer, Long> posPair = this.indexDB.appendIndexAsync(3, 3, IndexType.IndexConf);
+ this.indexDB.appendIndexAsync(4, 4, IndexType.IndexSegment);
+ this.indexDB.waitForFlush(posPair.getSecond(), 100);
+ }
+ final Pair<IndexFile.IndexEntry, IndexFile.IndexEntry> indexPair = this.indexDB
+ .lookupLastLogIndexAndPosFromTail();
+ final IndexFile.IndexEntry lastSegmentIndex = indexPair.getFirst();
+ final IndexFile.IndexEntry lastConfIndex = indexPair.getSecond();
+ assert (lastSegmentIndex.getLogIndex() == 4);
+ assert (lastConfIndex.getLogIndex() == 3);
+ }
+
+ @Test
+ public void testRecoverNormal() throws Exception {
+ this.testAppendIndex();
+ {
+ // Try to shutdown and recover , check flush position
+ this.indexDB.shutdown();
+ this.init();
+ this.indexDB.recover();
+ assertEquals(212, this.indexDB.getFlushedPosition());
+ }
+ }
+
+ @Test
+ public void testRecoverAbNormal() throws Exception {
+ this.testAppendIndex();
+ {
+ // Try to shutdown and recover , check flush position
+ this.indexDB.shutdown();
+ this.init();
+ // Create abort file
+ this.abortFile.create();
+ this.indexDB.recover();
+ assertEquals(212, this.indexDB.getFlushedPosition());
+ }
+ }
+
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
new file mode 100644
index 0000000000..13542f0bca
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/db/SegmentLogDBTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.db;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.AbstractDB;
+import org.apache.ignite.raft.jraft.storage.logit.storage.db.SegmentLogDB;
+import org.apache.ignite.raft.jraft.storage.logit.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SegmentLogDBTest extends BaseLogitStorageTest {
+ private SegmentLogDB segmentLogDB;
+ private String segmentStorePath;
+
+ @BeforeEach
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ this.segmentStorePath = this.path + File.separator + "segment";
+ Files.createDirectories(Path.of(segmentStorePath));
+ this.init();
+ }
+
+ public void init() {
+ this.segmentLogDB = new SegmentLogDB(this.segmentStorePath);
+ this.segmentLogDB.init(this.logStoreFactory);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ this.segmentLogDB.shutdown();
+ }
+
+ @Test
+ public void testAppendLog() throws Exception {
+ this.segmentLogDB.startServiceManager();
+ // The default file size is 300
+ // One entry size = 24 + 6 = 30, so this case will create three log file
+ Pair<Integer, Long> posPair = null;
+ for (int i = 0; i < 20; i++) {
+ final byte[] data = genData(i, 0, 24);
+ posPair = this.segmentLogDB.appendLogAsync(i, data);
+ }
+ this.segmentLogDB.waitForFlush(posPair.getSecond(), 100);
+ assertEquals(0, this.segmentLogDB.getFirstLogIndex());
+ assertEquals(19, this.segmentLogDB.getLastLogIndex());
+ assertEquals((600 + 26 + 2 * 30), this.segmentLogDB.getFlushedPosition());
+ }
+
+ @Test
+ public void testIterator() throws Exception {
+ testAppendLog();
+ // Read from the fifth entry, pos = 26 + 30 * 4 = 146
+ final AbstractDB.LogEntryIterator iterator = this.segmentLogDB.iterator(LogEntryV1CodecFactory.getInstance()
+ .decoder(), 5, 146);
+ LogEntry entry;
+ int index = 4;
+ while ((entry = iterator.next()) != null) {
+ assertEquals(index, entry.getId().getIndex());
+ index++;
+ }
+ }
+
+ @Test
+ public void testRecover() throws Exception {
+ this.segmentLogDB.startServiceManager();
+ final byte[] data = genData(1, 0, 150);
+ final byte[] data2 = genData(2, 0, 100);
+ final byte[] data3 = genData(3, 0, 100);
+ {
+ // Write first file , one segment file size = 300
+ this.segmentLogDB.appendLogAsync(1, data);
+ this.segmentLogDB.appendLogAsync(2, data2);
+ // Write second file
+ final Pair<Integer, Long> posPair = this.segmentLogDB.appendLogAsync(3, data3);
+ this.segmentLogDB.waitForFlush(posPair.getSecond(), 100);
+ }
+
+ final byte[] log = this.segmentLogDB.lookupLog(3, this.headerSize);
+ assertArrayEquals(data3, log);
+ {
+ this.segmentLogDB.shutdown();
+ this.init();
+ this.segmentLogDB.recover();
+ // Last flush position = one segment file size (300) + header(26) + log3Size(2 + 4 + 100) = 432
+ assertEquals(432, this.segmentLogDB.getFlushedPosition());
+ }
+ {
+ final byte[] log1 = this.segmentLogDB.lookupLog(1, this.headerSize);
+ assertArrayEquals(data, log1);
+ final byte[] log3 = this.segmentLogDB.lookupLog(3, this.headerSize);
+ assertArrayEquals(data3, log3);
+ }
+ }
+
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/FileManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/FileManagerTest.java
new file mode 100644
index 0000000000..b3de55fcf9
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/FileManagerTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.file;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.AbstractFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileManager;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileType;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.service.AllocateFileService;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Use indexFile to test fileManager
+ */
+public class FileManagerTest extends BaseLogitStorageTest {
+ private FileManager fileManager;
+ private AllocateFileService allocateService;
+ private String indexStorePath;
+
+ @BeforeEach
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ this.indexStorePath = this.path + File.separator + "index";
+ Files.createDirectories(Path.of(indexStorePath));
+ this.allocateService = new AllocateFileService(FileType.FILE_INDEX, indexStorePath, this.logStoreFactory);
+ this.allocateService.start();
+ this.fileManager = this.logStoreFactory.newFileManager(FileType.FILE_INDEX, indexStorePath,
+ this.allocateService);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ this.fileManager.shutdown();
+ this.allocateService.shutdown(true);
+ }
+
+ /**
+ * When call writeDataToFirstFile and writeDataToSecondFile
+ * The fileManager's file state is :
+ *
+ * fileId fileFromOffset firstLogIndex lastLogIndex fileLastOffset wrotePosition
+ * 0 0 0 9 26 + 100 =216 26 + 100
+ * 1 26 + 100 10 15 26 + 26 + 160 = 212 26 + 60
+ */
+ @Test
+ public void writeDataToFirstFile() {
+ // Append 10 index to first file , and come to the file end (size:130)
+ {
+ for (int i = 0; i < 10; i++) {
+ final AbstractFile lastFile = this.fileManager.getLastFile(i, 10, true);
+ assert (lastFile instanceof IndexFile);
+ final IndexFile indexFile = (IndexFile) lastFile;
+ indexFile.appendIndex(i, i, segmentIndex);
+ }
+ }
+ }
+
+ @Test
+ public void writeDataToSecondFile() {
+ writeDataToFirstFile();
+
+ // Try get last file again , this file is a new blank file (from allocator)
+ final AbstractFile lastFile = this.fileManager.getLastFile(10, 10, true);
+ assertEquals(this.indexFileSize, lastFile.getFileFromOffset());
+
+ // Write 5 index to second file , wrotePosition = 30 + 50
+ final IndexFile indexFile = (IndexFile) lastFile;
+ for (int i = 10; i <= 15; i++) {
+ indexFile.appendIndex(i, i, segmentIndex);
+ }
+ }
+
+ @Test
+ public void testFindAbstractFileByOffset() {
+ writeDataToSecondFile();
+ {
+ // Test find first file by offset 0
+ final AbstractFile firstFile = this.fileManager.findFileByOffset(30, false);
+ assertEquals(0, firstFile.getFileFromOffset());
+ // Test find second file by offset 136
+ final AbstractFile secondFile = this.fileManager.findFileByOffset(this.indexFileSize + 10, false);
+ assertEquals(this.indexFileSize, secondFile.getFileFromOffset());
+ }
+ }
+
+ @Test
+ public void testFlush() {
+ writeDataToSecondFile();
+
+ {
+ // First time flush , flush position = indexFileSize (126)
+ this.fileManager.flush();
+ assertEquals(126, this.fileManager.getFlushedPosition());
+ }
+
+ {
+ // Second time flush , flush position = 212
+ this.fileManager.flush();
+ assertEquals(212, this.fileManager.getFlushedPosition());
+ }
+ }
+
+ @Test
+ public void testTruncateSuffix() {
+ // Current flush position = 212
+ testFlush();
+ // Test truncate to logIndex = 10 , that means update flush position to 126 + 26 + 10 = 162
+ this.fileManager.truncateSuffix(10, 0);
+ assertEquals(162, this.fileManager.getFlushedPosition());
+ assertEquals(10, this.fileManager.getLastLogIndex());
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
new file mode 100644
index 0000000000..0821aa78ea
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/index/IndexFileTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.file.index;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.channels.FileChannel;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.index.IndexFile.IndexEntry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class IndexFileTest extends BaseLogitStorageTest {
+
+ private static final int FILE_SIZE = 10 * 10 + FileHeader.HEADER_SIZE;
+ private IndexFile offsetIndex;
+
+ @BeforeEach
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ this.init();
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ this.offsetIndex.shutdown(1000);
+ }
+
+ private void init() {
+ final String filePath = this.path + File.separator + "IndexFileTest";
+ this.offsetIndex = new IndexFile(filePath, FILE_SIZE);
+ }
+
+ private final IndexEntry appendEntry0 = new IndexEntry(0, 1);
+ private final IndexEntry appendEntry1 = new IndexEntry(1, 2);
+ private final IndexEntry appendEntry2 = new IndexEntry(2, 3);
+
+ @Test
+ public void testAppendIndex() {
+ this.offsetIndex.appendIndex(appendEntry0.getOffset(), appendEntry0.getPosition(), segmentIndex);
+ this.offsetIndex.appendIndex(appendEntry1.getOffset(), appendEntry1.getPosition(), segmentIndex);
+ this.offsetIndex.appendIndex(appendEntry2.getOffset(), appendEntry2.getPosition(), segmentIndex);
+ this.offsetIndex.flush();
+ assertEquals(this.offsetIndex.getLastLogIndex(), appendEntry2.getOffset());
+ }
+
+ @Test
+ public void testLooUp() {
+ testAppendIndex();
+
+ final IndexEntry entry0 = this.offsetIndex.lookupIndex(appendEntry0.getOffset());
+ assertEquals(appendEntry0.getOffset(), entry0.getOffset());
+
+ final IndexEntry entry1 = this.offsetIndex.lookupIndex(appendEntry1.getOffset());
+ assertEquals(appendEntry1.getOffset(), entry1.getOffset());
+
+ final IndexEntry entry2 = this.offsetIndex.lookupIndex(appendEntry2.getOffset());
+ assertEquals(appendEntry2.getOffset(), entry2.getOffset());
+ }
+
+ @Test
+ public void testTruncate() {
+ // Append 10 index entry
+ for (int idx = 1; idx <= 10; idx++) {
+ this.offsetIndex.appendIndex(idx, idx, segmentIndex);
+ }
+
+ // Check truncate to 9
+ {
+ this.offsetIndex.truncate(9, 0);
+ assertEquals(8, this.offsetIndex.getLastLogIndex());
+ }
+
+ // Check truncate to 5
+ {
+ this.offsetIndex.truncate(5, 0);
+ // Test recover
+ this.offsetIndex.shutdown(1000);
+ this.init();
+ this.offsetIndex.recover();
+ assertEquals(4, this.offsetIndex.getLastLogIndex());
+ }
+ }
+
+ @Test
+ public void testRecoverFromInvalidData() throws Exception {
+ testAppendIndex();
+ // Reopen
+ {
+ this.offsetIndex.shutdown(1000);
+ this.init();
+ this.offsetIndex.recover();
+ assertEquals(this.headerSize + 30, this.offsetIndex.getWrotePosition());
+ assertEquals(2, this.offsetIndex.getLastLogIndex());
+
+ // Test lookup,all data is valid.
+ final IndexEntry entry1 = this.offsetIndex.lookupIndex(appendEntry1.getOffset());
+ assertEquals(appendEntry1.getOffset(), entry1.getOffset());
+ }
+ {
+ this.offsetIndex.shutdown(1000);
+ // Cleared data after pos= 46, the third data will be truncated when recovering.
+ try (FileOutputStream out = new FileOutputStream(new File(this.offsetIndex.getFilePath()), true);
+ FileChannel outChan = out.getChannel()) {
+ outChan.truncate(this.headerSize + 20);
+ out.flush();
+ }
+ this.init();
+ this.offsetIndex.recover();
+ assertEquals(this.headerSize + 20, this.offsetIndex.getWrotePosition());
+ assertEquals(1, this.offsetIndex.getLastLogIndex());
+ }
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
new file mode 100644
index 0000000000..4463927239
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/logit/file/segment/SegmentFileTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.logit.file.segment;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import org.apache.ignite.raft.jraft.storage.logit.BaseLogitStorageTest;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.FileHeader;
+import org.apache.ignite.raft.jraft.storage.logit.storage.file.segment.SegmentFile;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class SegmentFileTest extends BaseLogitStorageTest {
+
+ private static final int FILE_SIZE = 64 + FileHeader.HEADER_SIZE;
+ private SegmentFile segmentFile;
+
+ @BeforeEach
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ this.init();
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ this.segmentFile.shutdown(1000);
+ }
+
+ private void init() {
+ final String filePath = this.path + File.separator + "IndexFileTest";
+ this.segmentFile = new SegmentFile(filePath, FILE_SIZE);
+ }
+
+ @Test
+ public void testAppendDataAndRead() {
+ {
+ // Write 32 bytes data
+ final byte[] data = genData(0, 0, 32);
+ int firstWritePos = FileHeader.HEADER_SIZE;
+ assertFalse(this.segmentFile.reachesFileEndBy(SegmentFile.getWriteBytes(data)));
+ assertEquals(firstWritePos, this.segmentFile.appendData(0, data));
+ // Can't read before sync
+ this.segmentFile.flush();
+ assertArrayEquals(data, this.segmentFile.lookupData(0, firstWritePos));
+ }
+
+ {
+ // Write 20 bytes data, length = 6 + 14 = 20
+ final byte[] data2 = genData(1, 0, 20);
+ int nextWrotePos = FileHeader.HEADER_SIZE + 38;
+ assertFalse(this.segmentFile.reachesFileEndBy(SegmentFile.getWriteBytes(data2)));
+ assertEquals(nextWrotePos, this.segmentFile.appendData(1, data2));
+ // Can't read before sync
+ this.segmentFile.flush();
+ assertArrayEquals(data2, this.segmentFile.lookupData(1, nextWrotePos));
+ }
+ }
+
+ @Test
+ public void testRecoverFromInvalidData() throws IOException {
+ testAppendDataAndRead();
+ int firstWritePos = FileHeader.HEADER_SIZE;
+ {
+ // Restart segment file, all data is valid.
+ this.segmentFile.shutdown(1000);
+ this.init();
+ this.segmentFile.recover();
+ assertEquals(32, this.segmentFile.lookupData(0, firstWritePos).length);
+ assertEquals(20, this.segmentFile.lookupData(1, 38 + firstWritePos).length);
+ }
+
+ {
+ this.segmentFile.shutdown(1000);
+ try (FileOutputStream out = new FileOutputStream(new File(this.segmentFile.getFilePath()), true);
+ FileChannel outChan = out.getChannel()) {
+ // Cleared data after pos=64, the second data will be truncated when recovering.
+ outChan.truncate(64);
+ }
+ this.init();
+ this.segmentFile.recover();
+ // First data is still valid
+ assertEquals(32, this.segmentFile.lookupData(0, firstWritePos).length);
+ // The second data is truncated.
+ assertNull(this.segmentFile.lookupData(1, 38 + firstWritePos));
+ }
+ }
+
+ @Test
+ public void testTruncate() {
+ testAppendDataAndRead();
+ int truncatePos = FileHeader.HEADER_SIZE + 38;
+ this.segmentFile.truncate(1, truncatePos);
+
+ // Recover
+ this.segmentFile.shutdown(1000);
+ this.init();
+ this.segmentFile.recover();
+
+ assertEquals(0, this.segmentFile.getLastLogIndex());
+ }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java
index 4753c1947f..3dbc1d752f 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotMetaTableTest.java
@@ -52,7 +52,7 @@ public class LocalSnapshotMetaTableTest {
public void testAddRemove() {
LocalFileMetaOutter.LocalFileMeta meta = msgFactory.localFileMeta()
.checksum("test")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
.build();
assertEquals(0, table.listFiles().size());
assertTrue(this.table.addFile("data", meta));
@@ -70,12 +70,12 @@ public class LocalSnapshotMetaTableTest {
public void testSaveLoadFile(@WorkDirectory Path workDir) throws IOException {
LocalFileMetaOutter.LocalFileMeta meta1 = msgFactory.localFileMeta()
.checksum("data1")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
.build();
assertTrue(this.table.addFile("data1", meta1));
LocalFileMetaOutter.LocalFileMeta meta2 = msgFactory.localFileMeta()
.checksum("data2")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
.build();
assertTrue(this.table.addFile("data2", meta2));
@@ -105,12 +105,12 @@ public class LocalSnapshotMetaTableTest {
public void testSaveLoadIoBuffer() throws Exception {
LocalFileMetaOutter.LocalFileMeta meta1 = msgFactory.localFileMeta()
.checksum("data1")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
.build();
assertTrue(this.table.addFile("data1", meta1));
LocalFileMetaOutter.LocalFileMeta meta2 = msgFactory.localFileMeta()
.checksum("data2")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
.build();
assertTrue(this.table.addFile("data2", meta2));
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java
index 10b9a4cc89..0e1e3f4dc7 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/LocalSnapshotWriterTest.java
@@ -66,14 +66,14 @@ public class LocalSnapshotWriterTest extends BaseStorageTest {
LocalFileMetaOutter.LocalFileMeta meta = opts.getRaftMessagesFactory()
.localFileMeta()
.checksum("test")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
.build();
assertTrue(this.writer.addFile("data1", meta));
assertTrue(this.writer.addFile("data2"));
assertEquals(meta, this.writer.getFileMeta("data1"));
assertNull(((LocalFileMetaOutter.LocalFileMeta) this.writer.getFileMeta("data2")).checksum());
- assertFalse(((LocalFileMetaOutter.LocalFileMeta) this.writer.getFileMeta("data2")).hasUserMeta());
+ assertNull(((LocalFileMetaOutter.LocalFileMeta) this.writer.getFileMeta("data2")).userMeta());
this.writer.sync();
//create a new writer
@@ -82,7 +82,7 @@ public class LocalSnapshotWriterTest extends BaseStorageTest {
assertNotSame(writer, newWriter);
assertEquals(meta, newWriter.getFileMeta("data1"));
assertNull(((LocalFileMetaOutter.LocalFileMeta) newWriter.getFileMeta("data2")).checksum());
- assertFalse(((LocalFileMetaOutter.LocalFileMeta) newWriter.getFileMeta("data2")).hasUserMeta());
+ assertNull(((LocalFileMetaOutter.LocalFileMeta) newWriter.getFileMeta("data2")).userMeta());
}
@Test
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java
index 705c75fa91..24cd8506af 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/snapshot/local/SnapshotFileReaderTest.java
@@ -60,7 +60,7 @@ public class SnapshotFileReaderTest extends BaseStorageTest {
final LocalFileMetaOutter.LocalFileMeta meta = opts.getRaftMessagesFactory()
.localFileMeta()
.checksum("test")
- .source(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL)
+ .sourceNumber(LocalFileMetaOutter.FileSource.FILE_SOURCE_LOCAL.getNumber())
.build();
this.metaTable.addFile("data", meta);
return meta;