You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/04/26 02:14:08 UTC

[iotdb] branch native_raft updated: refactor log flush with dual buffer

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 2c2787fd85 refactor log flush with dual buffer
2c2787fd85 is described below

commit 2c2787fd8532b0d8992fee07fe2ffdfcc98b64f7
Author: Tian Jiang <jt...@163.com>
AuthorDate: Wed Apr 26 10:16:34 2023 +0800

    refactor log flush with dual buffer
---
 .../consensus/natraft/protocol/RaftMember.java     |   1 +
 .../protocol/log/manager/RaftLogManager.java       |   4 +
 .../manager/serialization/StableEntryManager.java  |   2 +
 .../serialization/SyncLogDequeSerializer.java      | 209 ++++++++++++++-------
 .../iotdb/consensus/natraft/utils/NodeReport.java  |   5 +
 .../commons/concurrent/dynamic/DynamicThread.java  |   4 +-
 6 files changed, 158 insertions(+), 67 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 1c2711e184..f7f58b5721 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1384,6 +1384,7 @@ public class RaftMember {
         lastReportIndex,
         logManager.getCommitLogIndex(),
         logManager.getCommitLogTerm(),
+        logManager.getPersistedLogIndex(),
         readOnly,
         heartbeatThread.getLastHeartbeatReceivedTime(),
         prevLastLogIndex,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index d73613c7a3..1aa12cbe6b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -973,4 +973,8 @@ public abstract class RaftLogManager {
   public ReentrantReadWriteLock getLock() {
     return lock;
   }
+
+  public long getPersistedLogIndex() {
+    return stableEntryManager.getPersistedLogIndex();
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index a57a8a0774..f1a7e6ebfb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -62,4 +62,6 @@ public interface StableEntryManager {
    * should be cleaned
    */
   void clearAllLogs(long commitIndex);
+
+  long getPersistedLogIndex();
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index d4b3240491..59947f3ff7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -18,6 +18,10 @@
  */
 package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -71,10 +75,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private static final String LOG_DATA_FILE_SUFFIX = "data";
   private static final String LOG_INDEX_FILE_SUFFIX = "idx";
 
-  /** the log data files */
+  /**
+   * the log data files
+   */
   private List<File> logDataFileList;
 
-  /** the log index files */
+  /**
+   * the log index files
+   */
   private List<IndexFileDescriptor> logIndexFileList;
 
   private LogParser parser = LogParser.getINSTANCE();
@@ -83,11 +91,16 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private FileOutputStream currentLogIndexOutputStream;
   private LogManagerMeta meta;
   private HardState state;
+  private String name;
 
-  /** min version of available log */
+  /**
+   * min version of available log
+   */
   private long minAvailableVersion = 0;
 
-  /** max version of available log */
+  /**
+   * max version of available log
+   */
   private long maxAvailableVersion = Long.MAX_VALUE;
 
   private String logDir;
@@ -96,6 +109,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   private ByteBuffer logDataBuffer;
   private ByteBuffer logIndexBuffer;
+  private ByteBuffer flushingLogDataBuffer;
+  private ByteBuffer flushingLogIndexBuffer;
 
   private long offsetOfTheCurrentLogDataOutputStream = 0;
 
@@ -121,8 +136,10 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   private int maxPersistRaftLogNumberOnDisk;
 
-  private ScheduledExecutorService persistLogDeleteExecutorService;
+  private ScheduledExecutorService persistLogExecutorService;
   private ScheduledFuture<?> persistLogDeleteLogFuture;
+  private ExecutorService flushingLogExecutorService;
+  private volatile Future<?> flushingLogFuture;
 
   /**
    * indicate the first raft log's index of {@link SyncLogDequeSerializer#logIndexOffsetList}, for
@@ -133,6 +150,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private long firstLogIndex = 0;
 
   private long lastLogIndex = 0;
+  private long persistedLogIndex = 0;
 
   /**
    * the index and file offset of the log, for example, the first pair is the offset of index
@@ -142,7 +160,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   private static final int LOG_DELETE_CHECK_INTERVAL_SECOND = 5;
 
-  /** the lock uses when change the log data files or log index files */
+  /**
+   * the lock uses when change the log data files or log index files
+   */
   private final Lock lock = new ReentrantLock();
 
   private volatile boolean isClosed = false;
@@ -153,6 +173,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   private void initCommonProperties() {
     logDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
     logIndexBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
+    flushingLogDataBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
+    flushingLogIndexBuffer = ByteBuffer.allocate(config.getRaftLogBufferSize());
+
     maxNumberOfLogsPerFetchOnDisk = config.getMaxNumberOfLogsPerFetchOnDisk();
     maxRaftLogIndexSizeInMemory = config.getMaxRaftLogIndexSizeInMemory();
     maxNumberOfPersistRaftLogFiles = config.getMaxNumberOfPersistRaftLogFiles();
@@ -170,18 +193,20 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     } catch (IOException e) {
       logger.error("log serializer build version controller failed", e);
     }
-    this.persistLogDeleteExecutorService =
+    this.persistLogExecutorService =
         new ScheduledThreadPoolExecutor(
             1,
             new BasicThreadFactory.Builder()
                 .namingPattern("persist-log-delete-" + logDir)
                 .daemon(true)
                 .build());
+    this.flushingLogExecutorService = Executors.newSingleThreadExecutor(
+        (r) -> new Thread(r, name + "-flushRaftLog"));
 
     this.persistLogDeleteLogFuture =
         ScheduledExecutorUtil.safelyScheduleAtFixedRate(
-            persistLogDeleteExecutorService,
-            this::checkDeletePersistRaftLog,
+            persistLogExecutorService,
+            this::checkPersistRaftLog,
             LOG_DELETE_CHECK_INTERVAL_SECOND,
             LOG_DELETE_CHECK_INTERVAL_SECOND,
             TimeUnit.SECONDS);
@@ -194,6 +219,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    */
   public SyncLogDequeSerializer(ConsensusGroupId groupId, RaftConfig config) {
     this.config = config;
+    name = groupId.toString();
     logDir = getLogDir(groupId);
     initCommonProperties();
     initMetaAndLogFiles();
@@ -204,7 +230,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return systemDir + File.separator + groupId + File.separator + "raftLog" + File.separator;
   }
 
-  /** for log tools */
+  /**
+   * for log tools
+   */
   @Override
   public LogManagerMeta getMeta() {
     return meta;
@@ -216,7 +244,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     meta.setLastAppliedIndex(applyIndex);
   }
 
-  /** Recover all the logs in disk. This function will be called once this instance is created. */
+  /**
+   * Recover all the logs in disk. This function will be called once this instance is created.
+   */
   @Override
   public List<Entry> getAllEntriesAfterAppliedIndex() {
     logger.debug(
@@ -230,8 +260,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
    * is uncommitted for persistent LogManagerMeta(meta's info is stale).We need to recover these
    * already persistent logs.
    *
-   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually been
-   * flushed to 7,when we restart cluster,we need to recover 6 and 7.
+   * <p>For example,commitIndex is 5 in persistent LogManagerMeta,But the log file has actually
+   * been flushed to 7,when we restart cluster,we need to recover 6 and 7.
    *
    * <p>Maybe,we can extract getAllEntriesAfterAppliedIndex and getAllEntriesAfterCommittedIndex
    * into getAllEntriesByIndex,but now there are too many test cases using it.
@@ -375,37 +405,63 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     if (isClosed || logDataBuffer.position() == 0) {
       return;
     }
-    lock.lock();
-    try {
-      // write into disk
+    if (flushingLogFuture != null) {
       try {
-        checkStream();
-        // 1. write to the log data file
-        byte[] compressed = compressor.compress(logDataBuffer.array(), 0, logDataBuffer.position());
-        ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
-        currentLogDataOutputStream.write(compressed);
-        logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
-        logIndexBuffer.putLong(lastLogIndex);
-        logIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
-        ReadWriteIOUtils.writeWithoutSize(
-            logIndexBuffer, 0, logIndexBuffer.position(), currentLogIndexOutputStream);
-        offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + compressed.length;
-        if (config.getFlushRaftLogThreshold() == 0) {
-          currentLogDataOutputStream.getChannel().force(true);
-          currentLogIndexOutputStream.getChannel().force(true);
-        }
-      } catch (IOException e) {
-        logger.error("Error in logs serialization: ", e);
-        return;
+        flushingLogFuture.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {
+        logger.error("Unexpected exception when flushing log in {}", name);
+        throw new RuntimeException(e);
       }
-      logDataBuffer.clear();
-      logIndexBuffer.clear();
-      logger.debug("End flushing log buffer.");
-    } finally {
-      lock.unlock();
+    } else {
+      switchBuffer();
+      flushingLogFuture = flushingLogExecutorService.submit(() -> flushLogBufferTask(lastLogIndex));
     }
   }
 
+  private void switchBuffer() {
+    ByteBuffer temp = logDataBuffer;
+    logDataBuffer = flushingLogIndexBuffer;
+    flushingLogDataBuffer = temp;
+    temp = logIndexBuffer;
+    logIndexBuffer = flushingLogIndexBuffer;
+    flushingLogIndexBuffer = temp;
+  }
+
+  private void flushLogBufferTask(long currentLastIndex) {
+    // write into disk
+    try {
+      checkStream();
+      // 1. write to the log data file
+      byte[] compressed = compressor.compress(flushingLogDataBuffer.array(), 0,
+          flushingLogDataBuffer.position());
+      ReadWriteIOUtils.write(compressed.length, currentLogDataOutputStream);
+      logIndexOffsetList.add(new Pair<>(lastLogIndex, offsetOfTheCurrentLogDataOutputStream));
+      flushingLogIndexBuffer.putLong(lastLogIndex);
+      flushingLogIndexBuffer.putLong(offsetOfTheCurrentLogDataOutputStream);
+      offsetOfTheCurrentLogDataOutputStream += Integer.BYTES + compressed.length;
+
+      currentLogDataOutputStream.write(compressed);
+      ReadWriteIOUtils.writeWithoutSize(
+          logIndexBuffer, 0, logIndexBuffer.position(), currentLogIndexOutputStream);
+      if (config.getFlushRaftLogThreshold() == 0) {
+        currentLogDataOutputStream.getChannel().force(true);
+        currentLogIndexOutputStream.getChannel().force(true);
+      }
+      persistedLogIndex = currentLastIndex;
+    } catch (IOException e) {
+      logger.error("Error in logs serialization: ", e);
+      return;
+    }
+
+    flushingLogDataBuffer.clear();
+    flushingLogIndexBuffer.clear();
+
+    switchBuffer();
+    logger.debug("End flushing log buffer.");
+  }
+
   private void forceFlushLogBufferWithoutCloseFile() {
     if (isClosed) {
       return;
@@ -429,7 +485,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /** flush the log buffer and check if the file needs to be closed */
+  /**
+   * flush the log buffer and check if the file needs to be closed
+   */
   @Override
   public void forceFlushLogBuffer() {
     lock.lock();
@@ -473,7 +531,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /** The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data */
+  /**
+   * The file name rules are as follows: ${startLogIndex}-${endLogIndex}-${version}.data
+   */
   private void recoverLogFiles() {
     // 1. first we should recover the log index file
     recoverLogFiles(LOG_INDEX_FILE_SUFFIX);
@@ -520,9 +580,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * Check that the file is legal or not
    *
-   * @param file file needs to be check
-   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or {@link
-   *     SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
+   * @param file     file needs to be check
+   * @param fileType {@link SyncLogDequeSerializer#LOG_DATA_FILE_SUFFIX} or
+   *                 {@link SyncLogDequeSerializer#LOG_INDEX_FILE_SUFFIX}
    * @return true if the file legal otherwise false
    */
   private boolean checkLogFile(File file, String fileType) {
@@ -750,7 +810,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     }
   }
 
-  /** for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version} */
+  /**
+   * for unclosed file, the file name is ${startIndex}-${Long.MAX_VALUE}-{version}
+   */
   private void createNewLogFile(String dirName, long startLogIndex) throws IOException {
     lock.lock();
     try {
@@ -826,6 +888,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       meta = new LogManagerMeta();
       state = new HardState();
     }
+
+    persistedLogIndex = meta.getLastLogIndex();
     logger.info(
         "Recovered log meta: {}, availableVersion: [{},{}], state: {}",
         meta,
@@ -877,15 +941,15 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       lock.unlock();
     }
 
-    if (persistLogDeleteExecutorService != null) {
-      persistLogDeleteExecutorService.shutdownNow();
+    if (persistLogExecutorService != null) {
+      persistLogExecutorService.shutdownNow();
       persistLogDeleteLogFuture.cancel(true);
       try {
-        persistLogDeleteExecutorService.awaitTermination(20, TimeUnit.SECONDS);
+        persistLogExecutorService.awaitTermination(20, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      persistLogDeleteExecutorService = null;
+      persistLogExecutorService = null;
     }
   }
 
@@ -945,10 +1009,14 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return Long.parseLong(file.getName().split(FILE_NAME_SEPARATOR)[2]);
   }
 
-  public void checkDeletePersistRaftLog() {
-    // 1. check the log index offset list size
+  public void checkPersistRaftLog() {
+
     lock.lock();
     try {
+      // 1. flush logs in buffer
+      flushLogBuffer();
+
+      // 2. check the log index offset list size
       if (logIndexOffsetList.size() > maxRaftLogIndexSizeInMemory) {
         int compactIndex = logIndexOffsetList.size() - maxRaftLogIndexSizeInMemory;
         logIndexOffsetList.subList(0, compactIndex).clear();
@@ -958,7 +1026,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       lock.unlock();
     }
 
-    // 2. check the persist log file number
+    // 3. check the persist log file number
     lock.lock();
     try {
       while (logDataFileList.size() > maxNumberOfPersistRaftLogFiles) {
@@ -968,7 +1036,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
       lock.unlock();
     }
 
-    // 3. check the persist log index number
+    // 4. check the persisted log index number
     lock.lock();
     try {
       while (logDataFileList.size() > 1) {
@@ -1095,7 +1163,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex the log end index
+   * @param endIndex   the log end index
    * @return the raft log which index between [startIndex, endIndex] or empty if not found
    */
   @Override
@@ -1200,9 +1268,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
 
   /**
    * @param startIndex the log start index
-   * @param endIndex the log end index
+   * @param endIndex   the log end index
    * @return first value-> the log data file, second value-> the left value is the start offset of
-   *     the file, the right is the end offset of the file
+   * the file, the right is the end offset of the file
    */
   private List<Pair<File, Pair<Long, Long>>> getLogDataFileAndOffset(
       long startIndex, long endIndex) {
@@ -1269,8 +1337,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log index file which contains the start index; the
-   *     second pair's first value is the file's start log index. the second pair's second value is
-   *     the file's end log index. null if not found
+   * second pair's first value is the file's start log index. the second pair's second value is the
+   * file's end log index. null if not found
    */
   public IndexFileDescriptor getLogIndexFile(long startIndex) {
     for (IndexFileDescriptor descriptor : logIndexFileList) {
@@ -1285,8 +1353,8 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   /**
    * @param startIndex the start log index
    * @return the first value of the pair is the log data file which contains the start index; the
-   *     second pair's first value is the file's start log index. the second pair's second value is
-   *     the file's end log index. null if not found
+   * second pair's first value is the file's start log index. the second pair's second value is the
+   * file's end log index. null if not found
    */
   public Pair<File, Pair<Long, Long>> getLogDataFile(long startIndex) {
     for (File file : logDataFileList) {
@@ -1305,9 +1373,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
   }
 
   /**
-   * @param file the log data file
+   * @param file              the log data file
    * @param startAndEndOffset the left value is the start offset of the file, the right is the end
-   *     offset of the file
+   *                          offset of the file
    * @return the logs between start offset and end offset
    */
   private List<Entry> getLogsFromOneLogDataFile(File file, Pair<Long, Long> startAndEndOffset) {
@@ -1364,6 +1432,11 @@ public class SyncLogDequeSerializer implements StableEntryManager {
     return result;
   }
 
+  @Override
+  public long getPersistedLogIndex() {
+    return persistedLogIndex;
+  }
+
   /**
    * VersionController manages the version(a monotonically increasing long) of a storage group. We
    * define that each memtable flush, data deletion, or data update will generate a new version of
@@ -1400,7 +1473,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
        * automatically increased by saveInterval to avoid conflicts.
        */
       private static long saveInterval = 100;
-      /** time partition id to dividing time series into different storage group */
+      /**
+       * time partition id to dividing time series into different storage group
+       */
       private long timePartitionId;
 
       private long prevVersion;
@@ -1414,7 +1489,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         restore();
       }
 
-      /** only used for upgrading */
+      /**
+       * only used for upgrading
+       */
       public SimpleFileVersionController(String directoryPath) throws IOException {
         this.directoryPath = directoryPath + File.separator + UPGRADE_DIR;
         restore();
@@ -1477,7 +1554,9 @@ public class SyncLogDequeSerializer implements StableEntryManager {
         prevVersion = currVersion;
       }
 
-      /** recovery from disk */
+      /**
+       * recovery from disk
+       */
       @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
       private void restore() throws IOException {
         File directory = SystemFileFactory.INSTANCE.getFile(directoryPath);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
index e6220a188d..ebbd5a6188 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeReport.java
@@ -66,6 +66,7 @@ public class NodeReport {
     long lastLogIndex;
     long commitIndex;
     long commitTerm;
+    long persistedIndex;
     boolean isReadOnly;
     long lastHeartbeatReceivedTime;
     long prevLastLogIndex;
@@ -79,6 +80,7 @@ public class NodeReport {
         long lastLogIndex,
         long commitIndex,
         long commitTerm,
+        long persistedIndex,
         boolean isReadOnly,
         long lastHeartbeatReceivedTime,
         long prevLastLogIndex,
@@ -90,6 +92,7 @@ public class NodeReport {
       this.lastLogIndex = lastLogIndex;
       this.commitIndex = commitIndex;
       this.commitTerm = commitTerm;
+      this.persistedIndex = persistedIndex;
       this.isReadOnly = isReadOnly;
       this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
       this.prevLastLogIndex = prevLastLogIndex;
@@ -114,6 +117,8 @@ public class NodeReport {
           + commitIndex
           + ", commitTerm="
           + commitTerm
+          + ", persistedIndex="
+          + persistedIndex
           + ", appliedLogIndex="
           + maxAppliedLogIndex
           + ", readOnly="
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
index 9fcb16ec8a..e301ba4400 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/dynamic/DynamicThread.java
@@ -36,8 +36,8 @@ public abstract class DynamicThread implements Runnable {
   private long idleTimeSum;
   private long runningTimeSum;
   // TODO: add configuration for the values
-  private double maximumIdleRatio = 0.5;
-  private double minimumIdleRatio = 0.1;
+  private double maximumIdleRatio = 0.8;
+  private double minimumIdleRatio = 0.2;
   private long minimumRunningTime = 10_000_000_000L;
 
   public DynamicThread(DynamicThreadGroup threadGroup) {