You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/19 02:59:09 UTC

[iotdb] branch native_raft updated: Add entry allocator

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 723927ef814 Add entry allocator
723927ef814 is described below

commit 723927ef81447354a8fb106e2c11ac1b3a6249e6
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri May 19 11:02:01 2023 +0800

    Add entry allocator
---
 .../consensus/natraft/protocol/RaftConfig.java     | 16 ++++++
 .../consensus/natraft/protocol/RaftMember.java     | 19 ++++++-
 .../consensus/natraft/protocol/log/Entry.java      | 30 +++++++++--
 .../protocol/log/dispatch/VotingLogList.java       | 17 ++++++-
 .../protocol/log/logtype/ConfigChangeEntry.java    |  5 +-
 .../natraft/protocol/log/logtype/EmptyEntry.java   |  5 +-
 .../natraft/protocol/log/logtype/RequestEntry.java | 12 +++--
 .../manager/DirectorySnapshotRaftLogManager.java   |  7 ++-
 .../protocol/log/manager/RaftLogManager.java       | 55 +++++++++++++-------
 .../protocol/log/recycle/EntryAllocator.java       | 58 ++++++++++++++++++++++
 .../org/apache/iotdb/tsfile/utils/PublicBAOS.java  |  4 ++
 11 files changed, 194 insertions(+), 34 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 76864ba5795..ac9423494ff 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -78,6 +78,7 @@ public class RaftConfig {
   private boolean waitApply = true;
   private double flowControlMinFlow = 10_000_000;
   private double flowControlMaxFlow = 100_000_000;
+  private int entryAllocatorCapacity = 100000;
   private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
   private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
   private RPCConfig rpcConfig;
@@ -463,6 +464,14 @@ public class RaftConfig {
     this.applierThreadNum = applierThreadNum;
   }
 
+  public int getEntryAllocatorCapacity() {
+    return entryAllocatorCapacity;
+  }
+
+  public void setEntryAllocatorCapacity(int entryAllocatorCapacity) {
+    this.entryAllocatorCapacity = entryAllocatorCapacity;
+  }
+
   public void loadProperties(Properties properties) {
     logger.debug("Loading properties: {}", properties);
 
@@ -691,6 +700,13 @@ public class RaftConfig {
                 "entry_serialization_buffer_size",
                 String.valueOf(this.getEntryDefaultSerializationBufferSize()))));
 
+
+    this.setEntryAllocatorCapacity(
+        Integer.parseInt(
+            properties.getProperty(
+                "entry_allocator_capacity",
+                String.valueOf(this.getEntryAllocatorCapacity()))));
+
     String consistencyLevel = properties.getProperty("consistency_level");
     if (consistencyLevel != null) {
       this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 1db0d383440..2a395a3f45e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogTask;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.DirectorySnapshotRaftLogManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
 import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer;
+import org.apache.iotdb.consensus.natraft.protocol.log.recycle.EntryAllocator;
 import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencer;
 import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencerFactory;
 import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSequencer;
@@ -192,6 +193,7 @@ public class RaftMember {
   private volatile LogAppender logAppender;
   private FlowBalancer flowBalancer;
   private Consumer<ConsensusGroupId> onRemove;
+  private EntryAllocator<RequestEntry> requestEntryAllocator;
 
   public RaftMember(
       String storageDir,
@@ -231,6 +233,7 @@ public class RaftMember {
     votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
     this.heartbeatReqHandler = new HeartbeatReqHandler(this);
     this.electionReqHandler = new ElectionReqHandler(this);
+    this.requestEntryAllocator = new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
     this.logManager =
         new DirectorySnapshotRaftLogManager(
             new SyncLogDequeSerializer(groupId, config),
@@ -238,7 +241,9 @@ public class RaftMember {
             name,
             stateMachine,
             config,
-            this::examineUnappliedEntry);
+            this::examineUnappliedEntry,
+            this::getSafeIndex,
+            this::recycleEntry);
     this.appenderFactory =
         config.isUseFollowerSlidingWindow() ? new Factory() : new BlockingLogAppender.Factory();
     this.logAppender = appenderFactory.create(this, config);
@@ -249,6 +254,16 @@ public class RaftMember {
     initPeerMap();
   }
 
+  public void recycleEntry(Entry entry) {
+    if (entry instanceof RequestEntry) {
+      requestEntryAllocator.recycle(((RequestEntry) entry));
+    }
+  }
+
+  public long getSafeIndex() {
+    return votingLogList.getSafeIndex();
+  }
+
   public void applyConfigChange(ConfigChangeEntry configChangeEntry) {
     List<Peer> newNodes = configChangeEntry.getNewPeers();
     if (!newNodes.equals(this.newNodes)) {
@@ -629,7 +644,7 @@ public class RaftMember {
     }
 
     logger.debug("{}: Processing request {}", name, request);
-    Entry entry = new RequestEntry(request);
+    Entry entry = requestEntryAllocator.Allocate();
     entry.preSerialize();
     entry.receiveTime = System.nanoTime();
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 714d6cd90a2..d46999d7bd8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -56,6 +56,7 @@ public abstract class Entry implements Comparable<Entry> {
   public long applyTime;
   public long waitEndTime;
 
+  protected volatile byte[] recycledBuffer;
   protected volatile ByteBuffer preSerializationCache;
   protected volatile ByteBuffer serializationCache;
 
@@ -63,7 +64,7 @@ public abstract class Entry implements Comparable<Entry> {
     return DEFAULT_SERIALIZATION_BUFFER_SIZE;
   }
 
-  protected abstract ByteBuffer serializeInternal();
+  protected abstract ByteBuffer serializeInternal(byte[] buffer);
 
   /**
    * Perform serialization before indexing to avoid serialization under locked environment. It
@@ -76,7 +77,7 @@ public abstract class Entry implements Comparable<Entry> {
       return;
     }
     long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
-    ByteBuffer byteBuffer = serializeInternal();
+    ByteBuffer byteBuffer = serializeInternal(recycledBuffer);
     Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
     preSerializationCache = byteBuffer;
   }
@@ -97,7 +98,7 @@ public abstract class Entry implements Comparable<Entry> {
       preSerializationCache = null;
     } else {
       long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
-      ByteBuffer byteBuffer = serializeInternal();
+      ByteBuffer byteBuffer = serializeInternal(recycledBuffer);
       Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
       serializationCache = byteBuffer;
     }
@@ -224,4 +225,27 @@ public abstract class Entry implements Comparable<Entry> {
   public void setSerializationCache(ByteBuffer serializationCache) {
     this.serializationCache = serializationCache;
   }
+
+  public void recycle() {
+    currLogTerm = -1;
+    prevTerm = -1;
+    applied = false;
+    exception = null;
+    byteSize = -1;
+    fromThisNode = false;
+    receiveTime = 0;
+    createTime = 0;
+    acceptedTime = 0;
+    committedTime = 0;
+    applyTime = 0;
+    waitEndTime = 0;
+    if (preSerializationCache != null) {
+      recycledBuffer = preSerializationCache.array();
+      preSerializationCache = null;
+    }
+    if (serializationCache != null) {
+      recycledBuffer = serializationCache.array();
+      serializationCache = null;
+    }
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 6dd088ec281..527a2ec0509 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
@@ -79,8 +81,6 @@ public class VotingLogList {
    * When an entry of index-term is strongly accepted by a node of acceptingNodeId, record the id in
    * all entries whose index <= the accepted entry. If any entry is accepted by a quorum, remove it
    * from the list.
-   *
-   * @return the lastly removed entry if any.
    */
   public void onStronglyAccept(VotingEntry entry, Peer acceptingNode) {
     logger.debug(
@@ -141,4 +141,17 @@ public class VotingLogList {
     STRONGLY_ACCEPTED,
     WEAKLY_ACCEPTED
   }
+
+  /**
+   * Safe index is the maximum index that all followers has reached. Entries below the index is no
+   * longer needed to be stored.
+   *
+   * @return the safe index
+   */
+  public long getSafeIndex() {
+    if (stronglyAcceptedIndices.size() < member.getAllNodes().size() - 1) {
+      return -1;
+    }
+    return stronglyAcceptedIndices.values().stream().min(Long::compareTo).orElse(-1L);
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 62cad1f6b82..26ed266509c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 public class ConfigChangeEntry extends Entry {
 
@@ -40,9 +41,9 @@ public class ConfigChangeEntry extends Entry {
   }
 
   @Override
-  protected ByteBuffer serializeInternal() {
+  protected ByteBuffer serializeInternal(byte[] buffer) {
     ByteArrayOutputStream byteArrayOutputStream =
-        new ByteArrayOutputStream(getDefaultSerializationBufferSize());
+        buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) : new PublicBAOS(buffer);
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
index db6d7f40daf..231a83b4e0c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 public class EmptyEntry extends Entry {
 
@@ -36,9 +37,9 @@ public class EmptyEntry extends Entry {
   }
 
   @Override
-  protected ByteBuffer serializeInternal() {
+  protected ByteBuffer serializeInternal(byte[] buffer) {
     ByteArrayOutputStream byteArrayOutputStream =
-        new ByteArrayOutputStream(getDefaultSerializationBufferSize());
+        buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) : new PublicBAOS(buffer);
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
       dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
       dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index d6ba9312cac..f6cb9ec3707 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -34,21 +34,25 @@ import java.util.Objects;
 
 import static org.apache.iotdb.consensus.natraft.protocol.log.Entry.Types.CLIENT_REQUEST;
 
-/** RequestLog contains a non-partitioned request like set storage group. */
+/**
+ * RequestLog contains a non-partitioned request like set storage group.
+ */
 public class RequestEntry extends Entry {
 
   private static final Logger logger = LoggerFactory.getLogger(RequestEntry.class);
   private volatile IConsensusRequest request;
 
-  public RequestEntry() {}
+  public RequestEntry() {
+  }
 
   public RequestEntry(IConsensusRequest request) {
     setRequest(request);
   }
 
   @Override
-  protected ByteBuffer serializeInternal() {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultSerializationBufferSize());
+  protected ByteBuffer serializeInternal(byte[] buffer) {
+    PublicBAOS byteArrayOutputStream =
+        buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) : new PublicBAOS(buffer);
     int requestSize = 0;
     int requestPos = 0;
     try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index 1f99fd4e2c5..2cd4e39203a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.manager;
 
+import java.util.function.Supplier;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -47,8 +48,10 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
       String name,
       IStateMachine stateMachine,
       RaftConfig config,
-      Consumer<List<Entry>> unappliedEntryExaminer) {
-    super(stableEntryManager, applier, name, stateMachine, config, unappliedEntryExaminer);
+      Consumer<List<Entry>> unappliedEntryExaminer,
+      Supplier<Long> safeIndexProvider,
+      Consumer<Entry> entryRecycler) {
+    super(stableEntryManager, applier, name, stateMachine, config, unappliedEntryExaminer, safeIndexProvider, entryRecycler);
   }
 
   @Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 1aa12cbe6b7..0c5fc6e936a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.consensus.natraft.protocol.log.manager;
 
+import java.util.function.Supplier;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.consensus.IStateMachine;
@@ -107,6 +108,8 @@ public abstract class RaftLogManager {
 
   protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private long committedEntrySize;
+  private Supplier<Long> safeIndexProvider;
+  private Consumer<Entry> entryRecycler;
 
   protected RaftLogManager(
       StableEntryManager stableEntryManager,
@@ -114,12 +117,15 @@ public abstract class RaftLogManager {
       String name,
       IStateMachine stateMachine,
       RaftConfig config,
-      Consumer<List<Entry>> unappliedEntryExaminer) {
+      Consumer<List<Entry>> unappliedEntryExaminer, Supplier<Long> safeIndexProvider,
+      Consumer<Entry> entryRecycler) {
     this.logApplier = applier;
     this.name = name;
     this.stateMachine = stateMachine;
     this.setStableEntryManager(stableEntryManager);
     this.config = config;
+    this.safeIndexProvider = safeIndexProvider;
+    this.entryRecycler = entryRecycler;
 
     initConf();
     initEntries(unappliedEntryExaminer);
@@ -132,7 +138,7 @@ public abstract class RaftLogManager {
     this.checkLogApplierExecutorService =
         IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
 
-    /** deletion check period of the submitted log */
+    /* deletion check period of the submitted log */
     int logDeleteCheckIntervalSecond = config.getLogDeleteCheckIntervalSecond();
 
     if (logDeleteCheckIntervalSecond > 0) {
@@ -147,7 +153,7 @@ public abstract class RaftLogManager {
 
     this.checkLogApplierFuture = checkLogApplierExecutorService.submit(this::checkAppliedLogIndex);
 
-    /** flush log to file periodically */
+    /* flush log to file periodically */
     if (config.isEnableRaftLogPersistence()) {
       this.applyAllCommittedLogWhenStartUp();
     }
@@ -203,14 +209,12 @@ public abstract class RaftLogManager {
    *
    * <p>
    *
-   * @throws IOException timeout exception
    */
   public abstract void takeSnapshot(RaftMember member);
 
   /**
    * Update the raftNode's hardState(currentTerm,voteFor) and flush to disk.
    *
-   * @param state
    */
   public void updateHardState(HardState state) {
     getStableEntryManager().setHardStateAndFlush(state);
@@ -579,7 +583,7 @@ public abstract class RaftLogManager {
     return entries.size() - 1;
   }
 
-  private void checkCompaction(List<Entry> entries) {
+  private List<Entry> checkCompaction(List<Entry> entries) {
     boolean needToCompactLog = false;
     // calculate the number of old committed entries to be reserved by entry number
     int numToReserveForNew = minNumOfLogsInMem;
@@ -601,11 +605,13 @@ public abstract class RaftLogManager {
     }
 
     // reserve old committed entries with the minimum number
+    List<Entry> removedEntries = Collections.emptyList();
     if (needToCompactLog) {
       int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
       int sizeToReserveForConfig = minNumOfLogsInMem;
-      innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
+      removedEntries = innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
     }
+    return removedEntries;
   }
 
   private void removedCommitted(List<Entry> entries) {
@@ -636,7 +642,6 @@ public abstract class RaftLogManager {
           entry.notify();
         }
       }
-      entry.setSerializationCache(null);
     }
   }
 
@@ -650,6 +655,7 @@ public abstract class RaftLogManager {
       return;
     }
 
+    List<Entry> removedEntries = Collections.emptyList();
     try {
       lock.writeLock().lock();
       long startTime = Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.getOperationStartTime();
@@ -669,13 +675,20 @@ public abstract class RaftLogManager {
               System.nanoTime() - entry.createTime);
         }
       }
-      checkCompaction(entries);
+      removedEntries = checkCompaction(entries);
       commitEntries(entries);
       applyEntries(entries);
       Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.calOperationCostTimeFromStart(startTime);
     } finally {
       lock.writeLock().unlock();
     }
+    recycleEntries(removedEntries);
+  }
+
+  protected void recycleEntries(List<Entry> removedEntries) {
+    for (Entry removedEntry : removedEntries) {
+      entryRecycler.accept(removedEntry);
+    }
   }
 
   /**
@@ -816,24 +829,28 @@ public abstract class RaftLogManager {
 
   /** check whether delete the committed log */
   void checkDeleteLog() {
+    List<Entry> removedEntries = Collections.emptyList();
     try {
       lock.writeLock().lock();
       if (appliedIndex - getFirstIndex() <= minNumOfLogsInMem) {
         return;
       }
-      innerDeleteLog(minNumOfLogsInMem);
+      removedEntries = innerDeleteLog(minNumOfLogsInMem);
     } catch (Exception e) {
       logger.error("{}, error occurred when checking delete log", name, e);
     } finally {
       lock.writeLock().unlock();
     }
+    recycleEntries(removedEntries);
   }
 
-  private void innerDeleteLog(int sizeToReserve) {
-    long appliedLogNum = appliedIndex - getFirstIndex();
-    long removeSize = appliedLogNum - sizeToReserve;
+  private List<Entry> innerDeleteLog(int sizeToReserve) {
+    long safeIndex = safeIndexProvider.get();
+    long indexToReserve = Math.max(appliedIndex, safeIndex);
+    long removableLogNum = indexToReserve - getFirstIndex();
+    long removeSize = removableLogNum - sizeToReserve;
     if (removeSize <= 0) {
-      return;
+      return Collections.emptyList();
     }
 
     long compactIndex = getFirstIndex() + removeSize;
@@ -847,7 +864,7 @@ public abstract class RaftLogManager {
         removeSize,
         commitIndex - getFirstIndex(),
         appliedIndex);
-    compactEntries(compactIndex);
+    List<Entry> removedEntries = compactEntries(compactIndex);
     if (config.isEnableRaftLogPersistence()) {
       getStableEntryManager().removeCompactedEntries(compactIndex);
     }
@@ -857,16 +874,17 @@ public abstract class RaftLogManager {
         getFirstIndex(),
         getLastLogIndex(),
         commitIndex - getFirstIndex());
+    return removedEntries;
   }
 
-  void compactEntries(long compactIndex) {
+  List<Entry> compactEntries(long compactIndex) {
     long firstIndex = getFirstIndex();
     if (compactIndex < firstIndex) {
       logger.info(
           "entries before request index ({}) have been compacted, and the compactIndex is ({})",
           firstIndex,
           compactIndex);
-      return;
+      return Collections.emptyList();
     }
     long lastLogIndex = getLastLogIndex();
     if (compactIndex >= lastLogIndex) {
@@ -877,9 +895,12 @@ public abstract class RaftLogManager {
     for (int i = 0; i < index; i++) {
       committedEntrySize -= entries.get(0).estimateSize();
     }
+    List<Entry> removedEntries = Collections.emptyList();
     if (index > 0) {
+      removedEntries = new ArrayList<>(entries.subList(0, index));
       entries.subList(0, index).clear();
     }
+    return removedEntries;
   }
 
   public Object getLogUpdateCondition(long logIndex) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
new file mode 100644
index 00000000000..a603624d764
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.consensus.natraft.protocol.log.recycle;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.function.Supplier;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+
+public class EntryAllocator<T extends Entry> {
+  private Queue<T> entryPool;
+  private Supplier<T> entryFactory;
+  private Queue<T> recyclingEntries;
+  private Supplier<Long> safeIndexProvider;
+
+  public EntryAllocator(RaftConfig config, Supplier<T> entryFactory, Supplier<Long> safeIndexProvider) {
+    this.entryPool = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity());
+    this.recyclingEntries = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity() / 2);
+    this.entryFactory = entryFactory;
+    this.safeIndexProvider = safeIndexProvider;
+  }
+
+  public T Allocate() {
+    T entry = entryPool.poll();
+    if (entry == null) {
+      entry = entryFactory.get();
+    }
+    return entry;
+  }
+
+  public void recycle(T entry) {
+    Long safeIndex = safeIndexProvider.get();
+    if (entry.getCurrLogIndex() <= safeIndex) {
+      entry.recycle();
+      entryPool.add(entry);
+    } else {
+      recyclingEntries.add(entry);
+    }
+
+    checkRecyclingEntries();
+  }
+
+  public void checkRecyclingEntries() {
+    Long safeIndex = safeIndexProvider.get();
+    while (!recyclingEntries.isEmpty()) {
+      T recyclingEntry = recyclingEntries.poll();
+      if (recyclingEntry != null && recyclingEntry.getCurrLogIndex() <= safeIndex) {
+        recyclingEntry.recycle();
+        entryPool.add(recyclingEntry);
+      } else {
+        if (recyclingEntry != null) {
+          recyclingEntries.add(recyclingEntry);
+        }
+        break;
+      }
+    }
+  }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 90c2d0f22cf..5a3ceb2922d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -37,6 +37,10 @@ public class PublicBAOS extends ByteArrayOutputStream {
     super(size);
   }
 
+  public PublicBAOS(byte[] buffer) {
+    this.buf = buffer;
+  }
+
   /**
    * get current all bytes data
    *