You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/19 03:37:16 UTC
[iotdb] branch native_raft updated: do not use safe index to remove log
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 4402fe1601f do not use safe index to remove log
4402fe1601f is described below
commit 4402fe1601fb2a3164fa5de506c6bfcf902df0ff
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri May 19 11:40:08 2023 +0800
do not use safe index to remove log
---
.../iotdb/consensus/natraft/protocol/RaftMember.java | 3 ++-
.../natraft/protocol/log/manager/RaftLogManager.java | 5 ++---
.../natraft/protocol/log/recycle/EntryAllocator.java | 16 ++++++++--------
3 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index b460fbc5a10..1b40503a5c3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -233,7 +233,8 @@ public class RaftMember {
votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
this.heartbeatReqHandler = new HeartbeatReqHandler(this);
this.electionReqHandler = new ElectionReqHandler(this);
- this.requestEntryAllocator = new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
+ this.requestEntryAllocator =
+ new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
this.logManager =
new DirectorySnapshotRaftLogManager(
new SyncLogDequeSerializer(groupId, config),
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index aa67fc7641f..337cc04e9d1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -652,7 +652,7 @@ public abstract class RaftLogManager {
return;
}
- List<Entry> removedEntries = Collections.emptyList();
+ List<Entry> removedEntries;
try {
lock.writeLock().lock();
long startTime = Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.getOperationStartTime();
@@ -842,8 +842,7 @@ public abstract class RaftLogManager {
}
private List<Entry> innerDeleteLog(int sizeToReserve) {
- long safeIndex = safeIndexProvider.get();
- long indexToReserve = Math.max(appliedIndex, safeIndex);
+ long indexToReserve = appliedIndex;
long removableLogNum = indexToReserve - getFirstIndex();
long removeSize = removableLogNum - sizeToReserve;
if (removeSize <= 0) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
index 55795bc18a0..3b92e366aec 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
@@ -22,20 +22,20 @@ package org.apache.iotdb.consensus.natraft.protocol.log.recycle;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
public class EntryAllocator<T extends Entry> {
- private Queue<T> entryPool;
+ private BlockingQueue<T> entryPool;
private Supplier<T> entryFactory;
- private Queue<T> recyclingEntries;
+ private BlockingQueue<T> recyclingEntries;
private Supplier<Long> safeIndexProvider;
public EntryAllocator(
RaftConfig config, Supplier<T> entryFactory, Supplier<Long> safeIndexProvider) {
this.entryPool = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity());
- this.recyclingEntries = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity() / 2);
+ this.recyclingEntries = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity());
this.entryFactory = entryFactory;
this.safeIndexProvider = safeIndexProvider;
}
@@ -52,9 +52,9 @@ public class EntryAllocator<T extends Entry> {
Long safeIndex = safeIndexProvider.get();
if (entry.getCurrLogIndex() <= safeIndex) {
entry.recycle();
- entryPool.add(entry);
+ entryPool.offer(entry);
} else {
- recyclingEntries.add(entry);
+ recyclingEntries.offer(entry);
}
checkRecyclingEntries();
@@ -66,10 +66,10 @@ public class EntryAllocator<T extends Entry> {
T recyclingEntry = recyclingEntries.poll();
if (recyclingEntry != null && recyclingEntry.getCurrLogIndex() <= safeIndex) {
recyclingEntry.recycle();
- entryPool.add(recyclingEntry);
+ entryPool.offer(recyclingEntry);
} else {
if (recyclingEntry != null) {
- recyclingEntries.add(recyclingEntry);
+ recyclingEntries.offer(recyclingEntry);
}
break;
}