You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/19 02:59:09 UTC
[iotdb] branch native_raft updated: Add entry allocator
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 723927ef814 Add entry allocator
723927ef814 is described below
commit 723927ef81447354a8fb106e2c11ac1b3a6249e6
Author: Tian Jiang <jt...@163.com>
AuthorDate: Fri May 19 11:02:01 2023 +0800
Add entry allocator
---
.../consensus/natraft/protocol/RaftConfig.java | 16 ++++++
.../consensus/natraft/protocol/RaftMember.java | 19 ++++++-
.../consensus/natraft/protocol/log/Entry.java | 30 +++++++++--
.../protocol/log/dispatch/VotingLogList.java | 17 ++++++-
.../protocol/log/logtype/ConfigChangeEntry.java | 5 +-
.../natraft/protocol/log/logtype/EmptyEntry.java | 5 +-
.../natraft/protocol/log/logtype/RequestEntry.java | 12 +++--
.../manager/DirectorySnapshotRaftLogManager.java | 7 ++-
.../protocol/log/manager/RaftLogManager.java | 55 +++++++++++++-------
.../protocol/log/recycle/EntryAllocator.java | 58 ++++++++++++++++++++++
.../org/apache/iotdb/tsfile/utils/PublicBAOS.java | 4 ++
11 files changed, 194 insertions(+), 34 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 76864ba5795..ac9423494ff 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -78,6 +78,7 @@ public class RaftConfig {
private boolean waitApply = true;
private double flowControlMinFlow = 10_000_000;
private double flowControlMaxFlow = 100_000_000;
+ private int entryAllocatorCapacity = 100000;
private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
private RPCConfig rpcConfig;
@@ -463,6 +464,14 @@ public class RaftConfig {
this.applierThreadNum = applierThreadNum;
}
+ public int getEntryAllocatorCapacity() {
+ return entryAllocatorCapacity;
+ }
+
+ public void setEntryAllocatorCapacity(int entryAllocatorCapacity) {
+ this.entryAllocatorCapacity = entryAllocatorCapacity;
+ }
+
public void loadProperties(Properties properties) {
logger.debug("Loading properties: {}", properties);
@@ -691,6 +700,13 @@ public class RaftConfig {
"entry_serialization_buffer_size",
String.valueOf(this.getEntryDefaultSerializationBufferSize()))));
+
+ this.setEntryAllocatorCapacity(
+ Integer.parseInt(
+ properties.getProperty(
+ "entry_allocator_capacity",
+ String.valueOf(this.getEntryAllocatorCapacity()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 1db0d383440..2a395a3f45e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -67,6 +67,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogTask;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.DirectorySnapshotRaftLogManager;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer;
+import org.apache.iotdb.consensus.natraft.protocol.log.recycle.EntryAllocator;
import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencer;
import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencerFactory;
import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSequencer;
@@ -192,6 +193,7 @@ public class RaftMember {
private volatile LogAppender logAppender;
private FlowBalancer flowBalancer;
private Consumer<ConsensusGroupId> onRemove;
+ private EntryAllocator<RequestEntry> requestEntryAllocator;
public RaftMember(
String storageDir,
@@ -231,6 +233,7 @@ public class RaftMember {
votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
this.heartbeatReqHandler = new HeartbeatReqHandler(this);
this.electionReqHandler = new ElectionReqHandler(this);
+ this.requestEntryAllocator = new EntryAllocator<>(config, RequestEntry::new, this::getSafeIndex);
this.logManager =
new DirectorySnapshotRaftLogManager(
new SyncLogDequeSerializer(groupId, config),
@@ -238,7 +241,9 @@ public class RaftMember {
name,
stateMachine,
config,
- this::examineUnappliedEntry);
+ this::examineUnappliedEntry,
+ this::getSafeIndex,
+ this::recycleEntry);
this.appenderFactory =
config.isUseFollowerSlidingWindow() ? new Factory() : new BlockingLogAppender.Factory();
this.logAppender = appenderFactory.create(this, config);
@@ -249,6 +254,16 @@ public class RaftMember {
initPeerMap();
}
+ public void recycleEntry(Entry entry) {
+ if (entry instanceof RequestEntry) {
+ requestEntryAllocator.recycle(((RequestEntry) entry));
+ }
+ }
+
+ public long getSafeIndex() {
+ return votingLogList.getSafeIndex();
+ }
+
public void applyConfigChange(ConfigChangeEntry configChangeEntry) {
List<Peer> newNodes = configChangeEntry.getNewPeers();
if (!newNodes.equals(this.newNodes)) {
@@ -629,7 +644,7 @@ public class RaftMember {
}
logger.debug("{}: Processing request {}", name, request);
- Entry entry = new RequestEntry(request);
+ Entry entry = requestEntryAllocator.Allocate();
entry.preSerialize();
entry.receiveTime = System.nanoTime();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 714d6cd90a2..d46999d7bd8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -56,6 +56,7 @@ public abstract class Entry implements Comparable<Entry> {
public long applyTime;
public long waitEndTime;
+ protected volatile byte[] recycledBuffer;
protected volatile ByteBuffer preSerializationCache;
protected volatile ByteBuffer serializationCache;
@@ -63,7 +64,7 @@ public abstract class Entry implements Comparable<Entry> {
return DEFAULT_SERIALIZATION_BUFFER_SIZE;
}
- protected abstract ByteBuffer serializeInternal();
+ protected abstract ByteBuffer serializeInternal(byte[] buffer);
/**
* Perform serialization before indexing to avoid serialization under locked environment. It
@@ -76,7 +77,7 @@ public abstract class Entry implements Comparable<Entry> {
return;
}
long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
- ByteBuffer byteBuffer = serializeInternal();
+ ByteBuffer byteBuffer = serializeInternal(recycledBuffer);
Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
preSerializationCache = byteBuffer;
}
@@ -97,7 +98,7 @@ public abstract class Entry implements Comparable<Entry> {
preSerializationCache = null;
} else {
long startTime = Statistic.SERIALIZE_ENTRY.getOperationStartTime();
- ByteBuffer byteBuffer = serializeInternal();
+ ByteBuffer byteBuffer = serializeInternal(recycledBuffer);
Statistic.SERIALIZE_ENTRY.calOperationCostTimeFromStart(startTime);
serializationCache = byteBuffer;
}
@@ -224,4 +225,27 @@ public abstract class Entry implements Comparable<Entry> {
public void setSerializationCache(ByteBuffer serializationCache) {
this.serializationCache = serializationCache;
}
+
+ public void recycle() {
+ currLogTerm = -1;
+ prevTerm = -1;
+ applied = false;
+ exception = null;
+ byteSize = -1;
+ fromThisNode = false;
+ receiveTime = 0;
+ createTime = 0;
+ acceptedTime = 0;
+ committedTime = 0;
+ applyTime = 0;
+ waitEndTime = 0;
+ if (preSerializationCache != null) {
+ recycledBuffer = preSerializationCache.array();
+ preSerializationCache = null;
+ }
+ if (serializationCache != null) {
+ recycledBuffer = serializationCache.array();
+ serializationCache = null;
+ }
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 6dd088ec281..527a2ec0509 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
@@ -79,8 +81,6 @@ public class VotingLogList {
* When an entry of index-term is strongly accepted by a node of acceptingNodeId, record the id in
* all entries whose index <= the accepted entry. If any entry is accepted by a quorum, remove it
* from the list.
- *
- * @return the lastly removed entry if any.
*/
public void onStronglyAccept(VotingEntry entry, Peer acceptingNode) {
logger.debug(
@@ -141,4 +141,17 @@ public class VotingLogList {
STRONGLY_ACCEPTED,
WEAKLY_ACCEPTED
}
+
+ /**
+ * Safe index is the maximum index that all followers has reached. Entries below the index is no
+ * longer needed to be stored.
+ *
+ * @return the safe index
+ */
+ public long getSafeIndex() {
+ if (stronglyAcceptedIndices.size() < member.getAllNodes().size() - 1) {
+ return -1;
+ }
+ return stronglyAcceptedIndices.values().stream().min(Long::compareTo).orElse(-1L);
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 62cad1f6b82..26ed266509c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
public class ConfigChangeEntry extends Entry {
@@ -40,9 +41,9 @@ public class ConfigChangeEntry extends Entry {
}
@Override
- protected ByteBuffer serializeInternal() {
+ protected ByteBuffer serializeInternal(byte[] buffer) {
ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream(getDefaultSerializationBufferSize());
+ buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) : new PublicBAOS(buffer);
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
index db6d7f40daf..231a83b4e0c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
public class EmptyEntry extends Entry {
@@ -36,9 +37,9 @@ public class EmptyEntry extends Entry {
}
@Override
- protected ByteBuffer serializeInternal() {
+ protected ByteBuffer serializeInternal(byte[] buffer) {
ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream(getDefaultSerializationBufferSize());
+ buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) : new PublicBAOS(buffer);
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index d6ba9312cac..f6cb9ec3707 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -34,21 +34,25 @@ import java.util.Objects;
import static org.apache.iotdb.consensus.natraft.protocol.log.Entry.Types.CLIENT_REQUEST;
-/** RequestLog contains a non-partitioned request like set storage group. */
+/**
+ * RequestLog contains a non-partitioned request like set storage group.
+ */
public class RequestEntry extends Entry {
private static final Logger logger = LoggerFactory.getLogger(RequestEntry.class);
private volatile IConsensusRequest request;
- public RequestEntry() {}
+ public RequestEntry() {
+ }
public RequestEntry(IConsensusRequest request) {
setRequest(request);
}
@Override
- protected ByteBuffer serializeInternal() {
- PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultSerializationBufferSize());
+ protected ByteBuffer serializeInternal(byte[] buffer) {
+ PublicBAOS byteArrayOutputStream =
+ buffer == null ? new PublicBAOS(getDefaultSerializationBufferSize()) : new PublicBAOS(buffer);
int requestSize = 0;
int requestPos = 0;
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index 1f99fd4e2c5..2cd4e39203a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
+import java.util.function.Supplier;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -47,8 +48,10 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
String name,
IStateMachine stateMachine,
RaftConfig config,
- Consumer<List<Entry>> unappliedEntryExaminer) {
- super(stableEntryManager, applier, name, stateMachine, config, unappliedEntryExaminer);
+ Consumer<List<Entry>> unappliedEntryExaminer,
+ Supplier<Long> safeIndexProvider,
+ Consumer<Entry> entryRecycler) {
+ super(stableEntryManager, applier, name, stateMachine, config, unappliedEntryExaminer, safeIndexProvider, entryRecycler);
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 1aa12cbe6b7..0c5fc6e936a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
+import java.util.function.Supplier;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.IStateMachine;
@@ -107,6 +108,8 @@ public abstract class RaftLogManager {
protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private long committedEntrySize;
+ private Supplier<Long> safeIndexProvider;
+ private Consumer<Entry> entryRecycler;
protected RaftLogManager(
StableEntryManager stableEntryManager,
@@ -114,12 +117,15 @@ public abstract class RaftLogManager {
String name,
IStateMachine stateMachine,
RaftConfig config,
- Consumer<List<Entry>> unappliedEntryExaminer) {
+ Consumer<List<Entry>> unappliedEntryExaminer, Supplier<Long> safeIndexProvider,
+ Consumer<Entry> entryRecycler) {
this.logApplier = applier;
this.name = name;
this.stateMachine = stateMachine;
this.setStableEntryManager(stableEntryManager);
this.config = config;
+ this.safeIndexProvider = safeIndexProvider;
+ this.entryRecycler = entryRecycler;
initConf();
initEntries(unappliedEntryExaminer);
@@ -132,7 +138,7 @@ public abstract class RaftLogManager {
this.checkLogApplierExecutorService =
IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
- /** deletion check period of the submitted log */
+ /* deletion check period of the submitted log */
int logDeleteCheckIntervalSecond = config.getLogDeleteCheckIntervalSecond();
if (logDeleteCheckIntervalSecond > 0) {
@@ -147,7 +153,7 @@ public abstract class RaftLogManager {
this.checkLogApplierFuture = checkLogApplierExecutorService.submit(this::checkAppliedLogIndex);
- /** flush log to file periodically */
+ /* flush log to file periodically */
if (config.isEnableRaftLogPersistence()) {
this.applyAllCommittedLogWhenStartUp();
}
@@ -203,14 +209,12 @@ public abstract class RaftLogManager {
*
* <p>
*
- * @throws IOException timeout exception
*/
public abstract void takeSnapshot(RaftMember member);
/**
* Update the raftNode's hardState(currentTerm,voteFor) and flush to disk.
*
- * @param state
*/
public void updateHardState(HardState state) {
getStableEntryManager().setHardStateAndFlush(state);
@@ -579,7 +583,7 @@ public abstract class RaftLogManager {
return entries.size() - 1;
}
- private void checkCompaction(List<Entry> entries) {
+ private List<Entry> checkCompaction(List<Entry> entries) {
boolean needToCompactLog = false;
// calculate the number of old committed entries to be reserved by entry number
int numToReserveForNew = minNumOfLogsInMem;
@@ -601,11 +605,13 @@ public abstract class RaftLogManager {
}
// reserve old committed entries with the minimum number
+ List<Entry> removedEntries = Collections.emptyList();
if (needToCompactLog) {
int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
int sizeToReserveForConfig = minNumOfLogsInMem;
- innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
+ removedEntries = innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
}
+ return removedEntries;
}
private void removedCommitted(List<Entry> entries) {
@@ -636,7 +642,6 @@ public abstract class RaftLogManager {
entry.notify();
}
}
- entry.setSerializationCache(null);
}
}
@@ -650,6 +655,7 @@ public abstract class RaftLogManager {
return;
}
+ List<Entry> removedEntries = Collections.emptyList();
try {
lock.writeLock().lock();
long startTime = Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.getOperationStartTime();
@@ -669,13 +675,20 @@ public abstract class RaftLogManager {
System.nanoTime() - entry.createTime);
}
}
- checkCompaction(entries);
+ removedEntries = checkCompaction(entries);
commitEntries(entries);
applyEntries(entries);
Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.calOperationCostTimeFromStart(startTime);
} finally {
lock.writeLock().unlock();
}
+ recycleEntries(removedEntries);
+ }
+
+ protected void recycleEntries(List<Entry> removedEntries) {
+ for (Entry removedEntry : removedEntries) {
+ entryRecycler.accept(removedEntry);
+ }
}
/**
@@ -816,24 +829,28 @@ public abstract class RaftLogManager {
/** check whether delete the committed log */
void checkDeleteLog() {
+ List<Entry> removedEntries = Collections.emptyList();
try {
lock.writeLock().lock();
if (appliedIndex - getFirstIndex() <= minNumOfLogsInMem) {
return;
}
- innerDeleteLog(minNumOfLogsInMem);
+ removedEntries = innerDeleteLog(minNumOfLogsInMem);
} catch (Exception e) {
logger.error("{}, error occurred when checking delete log", name, e);
} finally {
lock.writeLock().unlock();
}
+ recycleEntries(removedEntries);
}
- private void innerDeleteLog(int sizeToReserve) {
- long appliedLogNum = appliedIndex - getFirstIndex();
- long removeSize = appliedLogNum - sizeToReserve;
+ private List<Entry> innerDeleteLog(int sizeToReserve) {
+ long safeIndex = safeIndexProvider.get();
+ long indexToReserve = Math.max(appliedIndex, safeIndex);
+ long removableLogNum = indexToReserve - getFirstIndex();
+ long removeSize = removableLogNum - sizeToReserve;
if (removeSize <= 0) {
- return;
+ return Collections.emptyList();
}
long compactIndex = getFirstIndex() + removeSize;
@@ -847,7 +864,7 @@ public abstract class RaftLogManager {
removeSize,
commitIndex - getFirstIndex(),
appliedIndex);
- compactEntries(compactIndex);
+ List<Entry> removedEntries = compactEntries(compactIndex);
if (config.isEnableRaftLogPersistence()) {
getStableEntryManager().removeCompactedEntries(compactIndex);
}
@@ -857,16 +874,17 @@ public abstract class RaftLogManager {
getFirstIndex(),
getLastLogIndex(),
commitIndex - getFirstIndex());
+ return removedEntries;
}
- void compactEntries(long compactIndex) {
+ List<Entry> compactEntries(long compactIndex) {
long firstIndex = getFirstIndex();
if (compactIndex < firstIndex) {
logger.info(
"entries before request index ({}) have been compacted, and the compactIndex is ({})",
firstIndex,
compactIndex);
- return;
+ return Collections.emptyList();
}
long lastLogIndex = getLastLogIndex();
if (compactIndex >= lastLogIndex) {
@@ -877,9 +895,12 @@ public abstract class RaftLogManager {
for (int i = 0; i < index; i++) {
committedEntrySize -= entries.get(0).estimateSize();
}
+ List<Entry> removedEntries = Collections.emptyList();
if (index > 0) {
+ removedEntries = new ArrayList<>(entries.subList(0, index));
entries.subList(0, index).clear();
}
+ return removedEntries;
}
public Object getLogUpdateCondition(long logIndex) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
new file mode 100644
index 00000000000..a603624d764
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/recycle/EntryAllocator.java
@@ -0,0 +1,58 @@
+package org.apache.iotdb.consensus.natraft.protocol.log.recycle;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.function.Supplier;
+import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+
+public class EntryAllocator<T extends Entry> {
+ private Queue<T> entryPool;
+ private Supplier<T> entryFactory;
+ private Queue<T> recyclingEntries;
+ private Supplier<Long> safeIndexProvider;
+
+ public EntryAllocator(RaftConfig config, Supplier<T> entryFactory, Supplier<Long> safeIndexProvider) {
+ this.entryPool = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity());
+ this.recyclingEntries = new ArrayBlockingQueue<>(config.getEntryAllocatorCapacity() / 2);
+ this.entryFactory = entryFactory;
+ this.safeIndexProvider = safeIndexProvider;
+ }
+
+ public T Allocate() {
+ T entry = entryPool.poll();
+ if (entry == null) {
+ entry = entryFactory.get();
+ }
+ return entry;
+ }
+
+ public void recycle(T entry) {
+ Long safeIndex = safeIndexProvider.get();
+ if (entry.getCurrLogIndex() <= safeIndex) {
+ entry.recycle();
+ entryPool.add(entry);
+ } else {
+ recyclingEntries.add(entry);
+ }
+
+ checkRecyclingEntries();
+ }
+
+ public void checkRecyclingEntries() {
+ Long safeIndex = safeIndexProvider.get();
+ while (!recyclingEntries.isEmpty()) {
+ T recyclingEntry = recyclingEntries.poll();
+ if (recyclingEntry != null && recyclingEntry.getCurrLogIndex() <= safeIndex) {
+ recyclingEntry.recycle();
+ entryPool.add(recyclingEntry);
+ } else {
+ if (recyclingEntry != null) {
+ recyclingEntries.add(recyclingEntry);
+ }
+ break;
+ }
+ }
+ }
+
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
index 90c2d0f22cf..5a3ceb2922d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PublicBAOS.java
@@ -37,6 +37,10 @@ public class PublicBAOS extends ByteArrayOutputStream {
super(size);
}
+ public PublicBAOS(byte[] buffer) {
+ this.buf = buffer;
+ }
+
/**
* get current all bytes data
*