You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/11/14 13:44:45 UTC
[iotdb] branch multi_leader_memory_pendingBatch_control updated: refine dispatcher logic
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multi_leader_memory_pendingBatch_control by this push:
new dd0b79582d refine dispatcher logic
dd0b79582d is described below
commit dd0b79582dc92a1c65348d693fcd6d9a295d2e8c
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Mon Nov 14 21:44:35 2022 +0800
refine dispatcher logic
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../multileader/logdispatcher/LogDispatcher.java | 52 +++++++++++-----------
.../multileader/logdispatcher/SyncStatus.java | 2 +-
2 files changed, 27 insertions(+), 27 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index e2c44c2570..8c7a443d3c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -349,47 +349,47 @@ public class LogDispatcher {
public PendingBatch getBatch() {
long startIndex = syncStatus.getNextSendingIndex();
- long maxIndexWhenBufferedRequestEmpty = startIndex;
- logger.debug("[GetBatch] startIndex: {}", startIndex);
- if (bufferedRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) {
- // Use drainTo instead of poll to reduce lock overhead
+ long maxIndex;
+ synchronized (impl.getIndexObject()) {
+ maxIndex = impl.getIndex() + 1;
logger.debug(
- "{} : pendingRequest Size: {}, bufferedRequest size: {}",
+ "{}: startIndex: {}, maxIndex: {}, pendingRequest size: {}, bufferedRequest size: {}",
impl.getThisNode().getGroupId(),
+ startIndex,
+ maxIndex,
getPendingRequestSize(),
bufferedRequest.size());
- synchronized (impl.getIndexObject()) {
- pendingRequest.drainTo(
- bufferedRequest,
- config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size());
- maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
- }
- // remove all request that searchIndex < startIndex
- Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
- while (iterator.hasNext()) {
- IndexedConsensusRequest request = iterator.next();
- if (request.getSearchIndex() < startIndex) {
- iterator.remove();
- releaseReservedMemory(request);
- } else {
- break;
- }
+ // Use drainTo instead of poll to reduce lock overhead
+ pendingRequest.drainTo(
+ bufferedRequest,
+ config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size());
+ }
+ // remove all request that searchIndex < startIndex
+ Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+ while (iterator.hasNext()) {
+ IndexedConsensusRequest request = iterator.next();
+ if (request.getSearchIndex() < startIndex) {
+ iterator.remove();
+ releaseReservedMemory(request);
+ } else {
+ break;
}
}
+
PendingBatch batches = new PendingBatch(config);
// This condition will be executed in several scenarios:
// 1. restart
// 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed
// up. To prevent inconsistency here, we use the synchronized logic when calculate value of
- // `maxIndexWhenBufferedRequestEmpty`
+ // `maxIndex`
if (bufferedRequest.isEmpty()) {
- constructBatchFromWAL(startIndex, maxIndexWhenBufferedRequestEmpty, batches);
+ constructBatchFromWAL(startIndex, maxIndex, batches);
batches.buildIndex();
logger.debug(
"{} : accumulated a {} from wal when empty", impl.getThisNode().getGroupId(), batches);
} else {
// Notice that prev searchIndex >= startIndex
- Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator();
+ iterator = bufferedRequest.iterator();
IndexedConsensusRequest prev = iterator.next();
// Prevents gap between logs. For example, some requests are not written into the queue when
@@ -481,14 +481,14 @@ public class LogDispatcher {
logger.warn("wait for next WAL entry is interrupted");
}
IndexedConsensusRequest data = walEntryIterator.next();
- if (targetIndex > data.getSearchIndex()) {
+ if (data.getSearchIndex() < targetIndex) {
// if the index of request is smaller than currentIndex, then continue
logger.warn(
"search for one Entry which index is {}, but find a smaller one, index : {}",
targetIndex,
data.getSearchIndex());
continue;
- } else if (targetIndex < data.getSearchIndex()) {
+ } else if (data.getSearchIndex() > targetIndex) {
logger.warn(
"search for one Entry which index is {}, but find a larger one, index : {}",
targetIndex,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index e0ec7d4023..25af8b8fcd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -80,7 +80,7 @@ public class SyncStatus {
public void free() {
long size = 0;
for (PendingBatch pendingBatch : pendingBatches) {
- size = pendingBatch.getSerializedSize();
+ size += pendingBatch.getSerializedSize();
}
pendingBatches.clear();
multiLeaderMemoryManager.free(size);