You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/20 03:04:05 UTC

[iotdb] branch native_raft updated: add disk log compression

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 6b0f19c97a add disk log compression
6b0f19c97a is described below

commit 6b0f19c97aea1b5267766e746a196dce622cf14a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 20 11:06:03 2023 +0800

    add disk log compression
---
 .../consensus/natraft/protocol/RaftMember.java     |   2 +-
 .../log/appender/SlidingWindowLogAppender.java     |   6 +-
 .../protocol/log/dispatch/VotingLogList.java       |   8 +-
 .../protocol/log/manager/RaftLogManager.java       |  26 ++-
 .../manager/serialization/IndexFileDescriptor.java |  53 +++++
 .../serialization/SyncLogDequeSerializer.java      | 255 ++++++++++-----------
 .../log/sequencing/SynchronousSequencer.java       |   2 +-
 .../iotdb/consensus/natraft/utils/Timer.java       |  14 +-
 8 files changed, 209 insertions(+), 157 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index bbbc5a11c4..4aef915e65 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1222,7 +1222,7 @@ public class RaftMember {
       e.setCurrLogIndex(lastIndex + 1);
       e.setPrevTerm(lastTerm);
 
-      logManager.append(Collections.singletonList(e));
+      logManager.append(Collections.singletonList(e), true);
       votingEntry = LogUtils.buildVotingLog(e, this);
 
       setNewNodes(newNodes);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index cd84e82df1..6157ab6ea1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -148,7 +148,7 @@ public class SlidingWindowLogAppender implements LogAppender {
       }
     }
     if (success != -1) {
-      moveWindowRightward(flushPos);
+      moveWindowRightward(flushPos, logs.get(logs.size() - 1).getCurrLogIndex());
     }
     result.status = Response.RESPONSE_STRONG_ACCEPT;
     result.setLastLogIndex(firstPosPrevIndex);
@@ -156,13 +156,13 @@ public class SlidingWindowLogAppender implements LogAppender {
     return success;
   }
 
-  private void moveWindowRightward(int step) {
+  private void moveWindowRightward(int step, long newIndex) {
     System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
     System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
     for (int i = 1; i <= step; i++) {
       logWindow[windowCapacity - i] = null;
     }
-    firstPosPrevIndex = logManager.getLastLogIndex();
+    firstPosPrevIndex = newIndex;
   }
 
   private void moveWindowLeftward(int step) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 15f845abbe..d19c6d33d3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -48,11 +48,11 @@ public class VotingLogList {
 
   private boolean tryCommit(VotingEntry entry) {
     RaftLogManager logManager = member.getLogManager();
-
-    if (computeNewCommitIndex(entry)
-        && logManager != null
-        && newCommitIndex.get() > logManager.getCommitLogIndex()) {
+    boolean commitIndexUpdated = computeNewCommitIndex(entry);
+    if (commitIndexUpdated && newCommitIndex.get() > logManager.getCommitLogIndex()) {
       try {
+        Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_BEFORE_COMMIT.add(
+            System.nanoTime() - entry.getEntry().createTime);
         logManager.commitTo(newCommitIndex.get());
       } catch (LogExecutionException e) {
         logger.error("Fail to commit {}", newCommitIndex, e);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 2c3ffb488e..5f2c4f6f04 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -360,7 +360,7 @@ public abstract class RaftLogManager {
 
         } else {
           long offset = lastIndex + 1;
-          append(entries.subList((int) (ci - offset), entries.size()));
+          append(entries.subList((int) (ci - offset), entries.size()), false);
         }
         return newLastIndex;
       }
@@ -377,7 +377,7 @@ public abstract class RaftLogManager {
    * @param appendingEntries appendingEntries
    * @return the newly generated lastIndex
    */
-  public long append(List<Entry> appendingEntries) {
+  public long append(List<Entry> appendingEntries, boolean isLeader) {
     if (entries.isEmpty()) {
       return getLastLogIndex();
     }
@@ -410,11 +410,16 @@ public abstract class RaftLogManager {
       }
     }
 
-    Object logUpdateCondition =
-        getLogUpdateCondition(entries.get(entries.size() - 1).getCurrLogIndex());
-    synchronized (logUpdateCondition) {
-      logUpdateCondition.notifyAll();
+    if (!isLeader) {
+      // log update condition is to inform follower appending threads that the log is updated, and
+      // the leader does not concern it
+      Object logUpdateCondition =
+          getLogUpdateCondition(entries.get(entries.size() - 1).getCurrLogIndex());
+      synchronized (logUpdateCondition) {
+        logUpdateCondition.notifyAll();
+      }
     }
+
     return getLastLogIndex();
   }
 
@@ -425,7 +430,7 @@ public abstract class RaftLogManager {
    * @param term the entry's term which index is leaderCommit in leader's log module
    * @return true or false
    */
-  public synchronized boolean maybeCommit(long leaderCommit, long term) {
+  public boolean maybeCommit(long leaderCommit, long term) {
     if (leaderCommit > commitIndex && matchTerm(term, leaderCommit)) {
       try {
         commitTo(leaderCommit);
@@ -615,7 +620,12 @@ public abstract class RaftLogManager {
         return;
       }
 
-      removedCommitted(entries);
+      for (Entry entry : entries) {
+        if (entry.createTime != 0) {
+          Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_READY_COMMIT.add(
+              System.nanoTime() - entry.createTime);
+        }
+      }
       checkCompaction(entries);
       commitEntries(entries);
       applyEntries(entries);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/IndexFileDescriptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/IndexFileDescriptor.java
new file mode 100644
index 0000000000..3ec30ce1f9
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/IndexFileDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.FILE_NAME_PART_LENGTH;
+
+public class IndexFileDescriptor {
+
+  private static final Logger logger = LoggerFactory.getLogger(IndexFileDescriptor.class);
+  File file;
+  long startIndex;
+  long endIndex;
+
+  public IndexFileDescriptor(File file) {
+    this.file = file;
+    String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
+    if (splits.length != FILE_NAME_PART_LENGTH) {
+      logger.error(
+          "file={} name should be in the following format: startLogIndex-endLogIndex-version-idx",
+          file.getAbsoluteFile());
+    }
+    this.startIndex = Long.parseLong(splits[0]);
+    this.endIndex = Long.parseLong(splits[1]);
+  }
+
+  public IndexFileDescriptor(File file, long startIndex, long endIndex) {
+    this.file = file;
+    this.startIndex = startIndex;
+    this.endIndex = endIndex;
+  }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index d330f4421e..e348541ce2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -21,13 +21,15 @@ package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.file.SystemFileFactory;
-import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
 import org.apache.iotdb.consensus.natraft.protocol.HardState;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.utils.BytesUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -73,7 +75,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private List<File> logDataFileList;
 
   /** the log index files */
-  private List<File> logIndexFileList;
+  private List<IndexFileDescriptor> logIndexFileList;
 
   private LogParser parser = LogParser.getINSTANCE();
   private File metaFile;
@@ -109,7 +111,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    *
    * <p>for log index file: ${startLogIndex}-${endLogIndex}-{version}-idx
    */
-  private static final int FILE_NAME_PART_LENGTH = 4;
+  static final int FILE_NAME_PART_LENGTH = 4;
 
   private int maxRaftLogIndexSizeInMemory;
 
@@ -130,11 +132,13 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    */
   private long firstLogIndex = 0;
 
+  private long lastLogIndex = 0;
+
   /**
-   * the offset of the log's index, for example, the first value is the offset of index
-   * ${firstLogIndex}, the second value is the offset of index ${firstLogIndex+1}
+   * the index and file offset of the log, for example, the first pair is the offset of index
+   * ${firstLogIndex}, the second pair is the offset of index ${firstLogIndex+1}
    */
-  private List<Long> logIndexOffsetList;
+  private List<Pair<Long, Long>> logIndexOffsetList;
 
   private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
 
@@ -143,6 +147,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   private volatile boolean isClosed = false;
   private RaftConfig config;
+  private ICompressor compressor = ICompressor.getCompressor(CompressionType.SNAPPY);
+  private IUnCompressor unCompressor = IUnCompressor.getUnCompressor(CompressionType.SNAPPY);
 
   private void initCommonProperties() {
     logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
@@ -177,18 +183,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
             TimeUnit.SECONDS);
   }
 
-  /**
-   * for log tools
-   *
-   * @param logPath log dir path
-   */
-  public SyncLogDequeSerializer(String logPath, RaftConfig config) {
-    this.config = config;
-    logDir = logPath + File.separator;
-    initCommonProperties();
-    initMetaAndLogFiles();
-  }
-
   /**
    * log in disk is [size of log1 | log1 buffer] [size of log2 | log2 buffer]
    *
@@ -286,24 +280,17 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       logDataBuffer.mark();
       logIndexBuffer.mark();
       ByteBuffer logData = log.serialize();
-      int size = logData.remaining() + Integer.BYTES;
       try {
-        logDataBuffer.putInt(logData.remaining());
         logDataBuffer.put(logData);
-        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
-        logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
-        offsetOfTheCurrentLogDataOutputStream += size;
+        lastLogIndex = log.getCurrLogIndex();
       } catch (BufferOverflowException e) {
-        logger.info("Raft log buffer overflow!");
+        logger.debug("Raft log buffer overflow!");
         logDataBuffer.reset();
         logIndexBuffer.reset();
         flushLogBuffer();
         checkCloseCurrentFile(log.getCurrLogIndex() - 1);
-        logDataBuffer.putInt(logData.capacity());
         logDataBuffer.put(logData);
-        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
-        logIndexOffsetList.add(offsetOfTheCurrentLogDataOutputStream);
-        offsetOfTheCurrentLogDataOutputStream += size;
+        lastLogIndex = log.getCurrLogIndex();
       }
     }
   }
@@ -350,26 +337,28 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
     if (currentLogIndexOutputStream != null) {
       currentLogIndexOutputStream.close();
-      logger.info("{}: Closed a log index file {}", this, getCurrentLogIndexFile());
+      logger.info("{}: Closed a log index file {}", this, getCurrentLogIndexFileDescriptor());
       currentLogIndexOutputStream = null;
 
-      File currentLogIndexFile = getCurrentLogIndexFile();
+      IndexFileDescriptor currentLogIndexFile = getCurrentLogIndexFileDescriptor();
       String newIndexFileName =
           currentLogIndexFile
+              .file
               .getName()
               .replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(commitIndex));
       File newCurrentLogIndexFile =
           SystemFileFactory.INSTANCE.getFile(
-              currentLogIndexFile.getParent() + File.separator + newIndexFileName);
-      if (!currentLogIndexFile.renameTo(newCurrentLogIndexFile)) {
-        logger.error("rename log index file={} failed", currentLogIndexFile.getAbsoluteFile());
+              currentLogIndexFile.file.getParent() + File.separator + newIndexFileName);
+      if (!currentLogIndexFile.file.renameTo(newCurrentLogIndexFile)) {
+        logger.error("rename log index file={} failed", currentLogIndexFile.file.getAbsoluteFile());
       }
       logger.debug(
           "rename index file={} to file={}",
-          currentLogIndexFile.getAbsoluteFile(),
+          currentLogIndexFile.file.getAbsoluteFile(),
           newCurrentLogIndexFile.getAbsoluteFile());
 
-      logIndexFileList.set(logIndexFileList.size() - 1, newCurrentLogIndexFile);
+      logIndexFileList.get(logIndexFileList.size() - 1).file = newCurrentLogIndexFile;
+      logIndexFileList.get(logIndexFileList.size() - 1).endIndex = commitIndex;
     }
 
     offsetOfTheCurrentLogDataOutputStream = 0;
@@ -386,10 +375,19 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       try {
         checkStream();
         // 1. write to the log data file
-        ReadWriteIOUtils.writeWithoutSize(
-            logDataBuffer, 0, logDataBuffer.position(), currentLogDataOutputStream);
+        byte[] compressed =
+            compressor.compress(
+                logDataBuffer.array(),
+                logDataBuffer.arrayOffset() + logDataBuffer.position(),
+                logDataBuffer.remaining());
+        ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
+        currentLogDataOutputStream.write(compressed);
+        logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
+        logIndexBuffer.putLong(lastLogIndex);
+        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
         ReadWriteIOUtils.writeWithoutSize(
             logIndexBuffer, 0, logIndexBuffer.position(), currentLogIndexOutputStream);
+        offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + compressed.length;
         if (config.getFlushRaftLogThreshold() == 0) {
           currentLogDataOutputStream.getChannel().force(true);
           currentLogIndexOutputStream.getChannel().force(true);
@@ -483,7 +481,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
     // sort by name before recover
     logDataFileList.sort(this::comparePersistLogFileName);
-    logIndexFileList.sort(this::comparePersistLogFileName);
+    logIndexFileList.sort(
+        (descriptor1, descriptor2) ->
+            comparePersistLogFileName(descriptor1.file, descriptor2.file));
 
     // 3. recover the last log file in case of abnormal exit
     recoverTheLastLogFile();
@@ -506,7 +506,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
             logDataFileList.add(file);
             break;
           case LOG_INDEX_FILE_SUFFIX:
-            logIndexFileList.add(file);
+            logIndexFileList.add(new IndexFileDescriptor(file));
             break;
           default:
             logger.error("unknown file type={}", logFileType);
@@ -565,19 +565,21 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       return;
     }
 
-    File lastIndexFile = logIndexFileList.get(logIndexFileList.size() - 1);
-    long endIndex = Long.parseLong(lastIndexFile.getName().split(FILE_NAME_SEPARATOR)[1]);
+    IndexFileDescriptor lastIndexFileDescriptor = logIndexFileList.get(logIndexFileList.size() - 1);
+    long endIndex = lastIndexFileDescriptor.endIndex;
     boolean success = true;
     if (endIndex != Long.MAX_VALUE) {
-      logger.info("last log index file={} no need to recover", lastIndexFile.getAbsoluteFile());
+      logger.info(
+          "last log index file={} no need to recover",
+          lastIndexFileDescriptor.file.getAbsoluteFile());
     } else {
-      success = recoverTheLastLogIndexFile(lastIndexFile);
+      success = recoverTheLastLogIndexFile(lastIndexFileDescriptor);
     }
 
     if (!success) {
       logger.error(
           "recover log index file failed, clear all logs in disk, {}",
-          lastIndexFile.getAbsoluteFile());
+          lastIndexFileDescriptor.file.getAbsoluteFile());
       forceDeleteAllLogFiles();
       clearFirstLogIndex();
       return;
@@ -603,9 +605,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private boolean recoverTheLastLogDataFile(File file) {
     String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
     long startIndex = Long.parseLong(splits[0]);
-    Pair<File, Pair<Long, Long>> fileStartAndEndIndex = getLogIndexFile(startIndex);
-    if (fileStartAndEndIndex.right.left == startIndex) {
-      long endIndex = fileStartAndEndIndex.right.right;
+    IndexFileDescriptor descriptor = getLogIndexFile(startIndex);
+    if (descriptor.startIndex == startIndex) {
+      long endIndex = descriptor.endIndex;
       String newDataFileName =
           file.getName().replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(endIndex));
       File newLogDataFile =
@@ -620,30 +622,31 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return false;
   }
 
-  private boolean recoverTheLastLogIndexFile(File file) {
-    logger.debug("start to recover the last log index file={}", file.getAbsoluteFile());
-    String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
-    long startIndex = Long.parseLong(splits[0]);
+  private boolean recoverTheLastLogIndexFile(IndexFileDescriptor descriptor) {
+    logger.debug("start to recover the last log index file={}", descriptor.file.getAbsoluteFile());
+    long startIndex = descriptor.startIndex;
     int longLength = 8;
     byte[] bytes = new byte[longLength];
 
-    int totalCount = 0;
+    long index;
     long offset = 0;
-    try (FileInputStream fileInputStream = new FileInputStream(file);
+    long endIndex = 0;
+    try (FileInputStream fileInputStream = new FileInputStream(descriptor.file);
         BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) {
       firstLogIndex = startIndex;
       while (bufferedInputStream.read(bytes) != -1) {
+        index = BytesUtils.bytesToLong(bytes);
+        bufferedInputStream.read(bytes);
         offset = BytesUtils.bytesToLong(bytes);
-        logIndexOffsetList.add(offset);
-        totalCount++;
+        logIndexOffsetList.add(new Pair<>(index, offset));
+        endIndex = index;
       }
     } catch (IOException e) {
       logger.error("recover log index file failed,", e);
     }
-    long endIndex = startIndex + totalCount - 1;
     logger.debug(
         "recover log index file={}, startIndex={}, endIndex={}",
-        file.getAbsoluteFile(),
+        descriptor.file.getAbsoluteFile(),
         startIndex,
         endIndex);
 
@@ -658,15 +661,21 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
     if (endIndex >= startIndex) {
       String newIndexFileName =
-          file.getName().replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(endIndex));
+          descriptor
+              .file
+              .getName()
+              .replaceAll(String.valueOf(Long.MAX_VALUE), String.valueOf(endIndex));
       File newLogIndexFile =
-          SystemFileFactory.INSTANCE.getFile(file.getParent() + File.separator + newIndexFileName);
-      if (!file.renameTo(newLogIndexFile)) {
-        logger.error("rename log index file={} failed when recover", file.getAbsoluteFile());
+          SystemFileFactory.INSTANCE.getFile(
+              descriptor.file.getParent() + File.separator + newIndexFileName);
+      if (!descriptor.file.renameTo(newLogIndexFile)) {
+        logger.error(
+            "rename log index file={} failed when recover", descriptor.file.getAbsoluteFile());
       }
-      logIndexFileList.set(logIndexFileList.size() - 1, newLogIndexFile);
+      descriptor.file = newLogIndexFile;
+      descriptor.endIndex = endIndex;
     } else {
-      logger.error("recover log index file failed,{}", file.getAbsoluteFile());
+      logger.error("recover log index file failed,{}", descriptor.file.getAbsoluteFile());
       return false;
     }
     return true;
@@ -733,8 +742,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
 
     if (currentLogIndexOutputStream == null) {
-      currentLogIndexOutputStream = new FileOutputStream(getCurrentLogIndexFile(), true);
-      logger.info("{}: Opened a new index data file: {}", this, getCurrentLogIndexFile());
+      currentLogIndexOutputStream =
+          new FileOutputStream(getCurrentLogIndexFileDescriptor().file, true);
+      logger.info("{}: Opened a new index data file: {}", this, getCurrentLogIndexFileDescriptor());
     }
   }
 
@@ -766,7 +776,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         logger.warn("Cannot create new log index file {}", logDataFile);
       }
       logDataFileList.add(logDataFile);
-      logIndexFileList.add(logIndexFile);
+      logIndexFileList.add(new IndexFileDescriptor(logIndexFile, startLogIndex, endLogIndex));
     } finally {
       lock.unlock();
     }
@@ -776,7 +786,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return logDataFileList.get(logDataFileList.size() - 1);
   }
 
-  private File getCurrentLogIndexFile() {
+  private IndexFileDescriptor getCurrentLogIndexFileDescriptor() {
     return logIndexFileList.get(logIndexFileList.size() - 1);
   }
 
@@ -1025,7 +1035,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
 
     File logDataFile = null;
-    File logIndexFile = null;
+    IndexFileDescriptor logIndexFile = null;
 
     lock.lock();
     try {
@@ -1036,18 +1046,18 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         return false;
       }
       Files.delete(logDataFile.toPath());
-      Files.delete(logIndexFile.toPath());
+      Files.delete(logIndexFile.file.toPath());
       logDataFileList.remove(0);
       logIndexFileList.remove(0);
       logger.debug(
           "delete date file={}, index file={}",
           logDataFile.getAbsoluteFile(),
-          logIndexFile.getAbsoluteFile());
+          logIndexFile.file.getAbsoluteFile());
     } catch (IOException e) {
       logger.error(
           "delete file failed, data file={}, index file={}",
           logDataFile.getAbsoluteFile(),
-          logIndexFile.getAbsoluteFile());
+          logIndexFile.file.getAbsoluteFile());
       return false;
     } finally {
       lock.unlock();
@@ -1095,9 +1105,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       startIndex = 0;
     }
 
-    long newEndIndex = endIndex;
+    long newEndIndex;
     if (endIndex - startIndex > maxNumberOfLogsPerFetchOnDisk) {
       newEndIndex = startIndex + maxNumberOfLogsPerFetchOnDisk;
+    } else {
+      newEndIndex = endIndex;
     }
     logger.debug(
         "intend to get logs between[{}, {}], actually get logs between[{},{}]",
@@ -1122,6 +1134,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       for (Pair<File, Pair<Long, Long>> pair : logDataFileAndOffsetList) {
         result.addAll(getLogsFromOneLogDataFile(pair.left, pair.right));
       }
+      long finalStartIndex = startIndex;
+      result.removeIf(
+          e -> !(finalStartIndex <= e.getCurrLogIndex() && e.getCurrLogIndex() <= newEndIndex));
 
       return result;
     } finally {
@@ -1147,16 +1162,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
     // 1. first find in memory
     if (logIndex >= firstLogIndex) {
-      int arrayIndex = (int) (logIndex - firstLogIndex);
-      if (arrayIndex < logIndexOffsetList.size()) {
-        offset = logIndexOffsetList.get(arrayIndex);
-        logger.debug(
-            "found the offset in memory, logIndex={}, firstLogIndex={}, logIndexOffsetList size={}, offset={}",
-            logIndex,
-            firstLogIndex,
-            logIndexOffsetList.size(),
-            offset);
-        return offset;
+      for (Pair<Long, Long> indexOffset : logIndexOffsetList) {
+        // end index
+        if (indexOffset.left >= logIndex) {
+          return indexOffset.right;
+        }
       }
     }
 
@@ -1167,36 +1177,30 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         logIndexOffsetList.size());
 
     // 2. second read the log index file
-    Pair<File, Pair<Long, Long>> fileWithStartAndEndIndex = getLogIndexFile(logIndex);
-    if (fileWithStartAndEndIndex == null) {
+    IndexFileDescriptor descriptor = getLogIndexFile(logIndex);
+    if (descriptor == null) {
       return -1;
     }
-    File file = fileWithStartAndEndIndex.left;
-    Pair<Long, Long> startAndEndIndex = fileWithStartAndEndIndex.right;
+    File file = descriptor.file;
     logger.debug(
         "start to read the log index file={} for log index={}, file size={}",
         file.getAbsoluteFile(),
         logIndex,
         file.length());
+    long endIndex;
     try (FileInputStream fileInputStream = new FileInputStream(file);
         BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)) {
-      long bytesNeedToSkip = (logIndex - startAndEndIndex.left) * (Long.BYTES);
-      long bytesActuallySkip = bufferedInputStream.skip(bytesNeedToSkip);
-      logger.debug("skip {} bytes when read file={}", bytesActuallySkip, file.getAbsoluteFile());
-      if (bytesNeedToSkip != bytesActuallySkip) {
-        logger.error(
-            "read file={} failed, should skip={}, actually skip={}",
-            file.getAbsoluteFile(),
-            bytesNeedToSkip,
-            bytesActuallySkip);
-        return -1;
+      while (bufferedInputStream.available() > 0) {
+        endIndex = ReadWriteIOUtils.readLong(bufferedInputStream);
+        offset = ReadWriteIOUtils.readLong(bufferedInputStream);
+        if (endIndex >= logIndex) {
+          return offset;
+        }
       }
-      offset = ReadWriteIOUtils.readLong(bufferedInputStream);
-      return offset;
     } catch (IOException e) {
       logger.error("can not read the log index file={}", file.getAbsoluteFile(), e);
-      return -1;
     }
+    return -1;
   }
 
   /**
@@ -1270,16 +1274,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    *     second pair's first value is the file's start log index. the second pair's second value is
    *     the file's end log index. null if not found
    */
-  public Pair<File, Pair<Long, Long>> getLogIndexFile(long startIndex) {
-    for (File file : logIndexFileList) {
-      String[] splits = file.getName().split(FILE_NAME_SEPARATOR);
-      if (splits.length != FILE_NAME_PART_LENGTH) {
-        logger.error(
-            "file={} name should be in the following format: startLogIndex-endLogIndex-version-idx",
-            file.getAbsoluteFile());
-      }
-      if (Long.parseLong(splits[0]) <= startIndex && startIndex <= Long.parseLong(splits[1])) {
-        return new Pair<>(file, new Pair<>(Long.parseLong(splits[0]), Long.parseLong(splits[1])));
+  public IndexFileDescriptor getLogIndexFile(long startIndex) {
+    for (IndexFileDescriptor descriptor : logIndexFileList) {
+      if (descriptor.startIndex <= startIndex && startIndex <= descriptor.endIndex) {
+        return descriptor;
       }
     }
     logger.debug("can not found the log index file for startIndex={}", startIndex);
@@ -1351,11 +1349,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
             file.getAbsoluteFile(),
             currentReadOffset,
             startAndEndOffset.right);
-        int logSize = ReadWriteIOUtils.readInt(bufferedInputStream);
-        Entry e = null;
-        e = parser.parse(ByteBuffer.wrap(ReadWriteIOUtils.readBytes(bufferedInputStream, logSize)));
-        result.add(e);
-        currentReadOffset = currentReadOffset + Integer.BYTES + logSize;
+        int logBlockSize = ReadWriteIOUtils.readInt(bufferedInputStream);
+        byte[] bytes = ReadWriteIOUtils.readBytes(bufferedInputStream, logBlockSize);
+        ByteBuffer uncompressed = ByteBuffer.wrap(unCompressor.uncompress(bytes));
+        while (uncompressed.remaining() > 0) {
+          Entry e = parser.parse(uncompressed);
+          result.add(e);
+        }
+        currentReadOffset = currentReadOffset + Integer.BYTES + logBlockSize;
       }
     } catch (UnknownLogTypeException e) {
       logger.error("Unknown log detected ", e);
@@ -1365,31 +1366,6 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return result;
   }
 
-  @TestOnly
-  public void setLogDataBuffer(ByteBuffer logDataBuffer) {
-    this.logDataBuffer = logDataBuffer;
-  }
-
-  @TestOnly
-  public void setMaxRaftLogPersistDataSizePerFile(int maxRaftLogPersistDataSizePerFile) {
-    this.maxRaftLogPersistDataSizePerFile = maxRaftLogPersistDataSizePerFile;
-  }
-
-  @TestOnly
-  public void setMaxNumberOfPersistRaftLogFiles(int maxNumberOfPersistRaftLogFiles) {
-    this.maxNumberOfPersistRaftLogFiles = maxNumberOfPersistRaftLogFiles;
-  }
-
-  @TestOnly
-  public List<File> getLogDataFileList() {
-    return logDataFileList;
-  }
-
-  @TestOnly
-  public List<File> getLogIndexFileList() {
-    return logIndexFileList;
-  }
-
   /**
    * VersionController manages the version(a monotonically increasing long) of a storage group. We
    * define that each memtable flush, data deletion, or data update will generate a new version of
@@ -1399,6 +1375,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    * be guaranteed by the caller.
    */
   public interface VersionController {
+
     /**
      * Get the next version number.
      *
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index 97df15c2aa..7c59049bad 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -70,7 +70,7 @@ public class SynchronousSequencer implements LogSequencer {
           e.createTime = System.nanoTime();
 
           // logDispatcher will serialize log, and set log size, and we will use the size after it
-          logManager.append(Collections.singletonList(e));
+          logManager.append(Collections.singletonList(e), true);
 
           votingEntry = LogUtils.buildVotingLog(e, member);
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index 619bde93d6..3a35b9dab6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -284,9 +284,21 @@ public class Timer {
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_BEFORE_COMMIT(
+        LOG_DISPATCHER,
+        "from create to before commit",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+    RAFT_SENDER_LOG_FROM_CREATE_TO_READY_COMMIT(
+        LOG_DISPATCHER,
+        "from create to ready commit",
+        TIME_SCALE,
+        true,
+        META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
     RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
         LOG_DISPATCHER,
-        "from create to commit",
+        "from create to committed",
         TIME_SCALE,
         true,
         META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),