You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/19 03:37:16 UTC

[iotdb] branch native_raft updated: do not use safe index to remove log

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new 4402fe1601f do not use safe index to remove log
4402fe1601f is described below

commit 4402fe1601fb2a3164fa5de506c6bfcf902df0ff
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri May 19 11:40:08 2023 +0800

    do not use safe index to remove log
---
 .../iotdb/consensus/natraft/protocol/RaftMember.java     |  3 ++-
 .../natraft/protocol/log/manager/RaftLogManager.java     |  5 ++---
 .../natraft/protocol/log/recycle/EntryAllocator.java     | 16 ++++++++--------
 3 files changed, 12 insertions(+), 12 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index b460fbc5a10..1b40503a5c3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -233,7 +233,8 @@ public class RaftMember {
     votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
     this.heartbeatReqHandler = new HeartbeatReqHandler(this);
     this.electionReqHandler = new ElectionReqHandler(this);
-    this.requestEntryAllocator = new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
+    this.requestEntryAllocator =
+        new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
     this.logManager =
         new DirectorySnapshotRaftLogManager(
             new SyncLogDequeSerializer(groupId, config),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index aa67fc7641f..337cc04e9d1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -652,7 +652,7 @@ public abstract class RaftLogManager {
       return;
     }
 
-    List<Entry> removedEntries = Collections.emptyList();
+    List<Entry> removedEntries;
     try {
       lock.writeLock().lock();
       long startTime = Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.getOperationStartTime();
@@ -842,8 +842,7 @@ public abstract class RaftLogManager {
   }
 
   private List<Entry> innerDeleteLog(int sizeToReserve) {
-    long safeIndex = safeIndexProvider.get();
-    long indexToReserve = Math.max(appliedIndex, safeIndex);
+    long indexToReserve = appliedIndex;
     long removableLogNum = indexToReserve - getFirstIndex();
     long removeSize = removableLogNum - sizeToReserve;
     if (removeSize <= 0) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
index 55795bc18a0..3b92e366aec 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
@@ -22,20 +22,20 @@ package org.apache.iotdb.consensus.natraft.protocol.log.recycle;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
 
-import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.function.Supplier;
 
 public class EntryAllocator<T extends Entry> {
-  private Queue<T> entryPool;
+  private BlockingQueue<T> entryPool;
   private Supplier<T> entryFactory;
-  private Queue<T> recyclingEntries;
+  private BlockingQueue<T> recyclingEntries;
   private Supplier<Long> safeIndexProvider;
 
   public EntryAllocator(
       RaftConfig config, Supplier<T> entryFactory, Supplier<Long> safeIndexProvider) {
     this.entryPool = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity());
-    this.recyclingEntries = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity() / 2);
+    this.recyclingEntries = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity());
     this.entryFactory = entryFactory;
     this.safeIndexProvider = safeIndexProvider;
   }
@@ -52,9 +52,9 @@ public class EntryAllocator<T extends Entry> {
     Long safeIndex = safeIndexProvider.get();
     if (entry.getCurrLogIndex() <= safeIndex) {
       entry.recycle();
-      entryPool.add(entry);
+      entryPool.offer(entry);
     } else {
-      recyclingEntries.add(entry);
+      recyclingEntries.offer(entry);
     }
 
     checkRecyclingEntries();
@@ -66,10 +66,10 @@ public class EntryAllocator<T extends Entry> {
       T recyclingEntry = recyclingEntries.poll();
       if (recyclingEntry != null && recyclingEntry.getCurrLogIndex() <= safeIndex) {
         recyclingEntry.recycle();
-        entryPool.add(recyclingEntry);
+        entryPool.offer(recyclingEntry);
       } else {
         if (recyclingEntry != null) {
-          recyclingEntries.add(recyclingEntry);
+          recyclingEntries.offer(recyclingEntry);
         }
         break;
       }