You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/11/10 01:20:07 UTC
[iotdb] branch multi_leader_memory_pendingBatch_control updated: Add memory control of SyncStatus and strength memory control of Queue (#7956)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/multi_leader_memory_pendingBatch_control by this push:
new f99c707673 Add memory control of SyncStatus and strength memory control of Queue (#7956)
f99c707673 is described below
commit f99c707673b895686da2a5e6827ab9f85ba142fa
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Thu Nov 10 09:20:01 2022 +0800
Add memory control of SyncStatus and strength memory control of Queue (#7956)
---
.../apache/iotdb/consensus/config/MultiLeaderConfig.java | 16 ++++++++++++----
.../consensus/multileader/MultiLeaderConsensus.java | 4 +++-
.../multileader/logdispatcher/LogDispatcher.java | 12 +++++++++---
.../logdispatcher/MultiLeaderMemoryManager.java | 10 +++++++---
.../multileader/logdispatcher/PendingBatch.java | 4 ++++
.../consensus/multileader/logdispatcher/SyncStatus.java | 15 ++++++++++++++-
6 files changed, 49 insertions(+), 12 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 8c7abec0ce..072dc00c37 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -197,7 +197,6 @@ public class MultiLeaderConfig {
public static class Replication {
private final int maxRequestNumPerBatch;
-
private final int maxSizePerBatch;
private final int maxPendingBatch;
private final int maxWaitingTimeForAccumulatingBatchInMs;
@@ -206,7 +205,8 @@ public class MultiLeaderConfig {
private final long walThrottleThreshold;
private final long throttleTimeOutMs;
private final long checkpointGap;
- private final Long allocateMemoryForConsensus;
+ private final long allocateMemoryForConsensus;
+ private final long allocateMemoryForQueue;
private Replication(
int maxRequestNumPerBatch,
@@ -218,7 +218,8 @@ public class MultiLeaderConfig {
long walThrottleThreshold,
long throttleTimeOutMs,
long checkpointGap,
- long allocateMemoryForConsensus) {
+ long allocateMemoryForConsensus,
+ double maxMemoryRatioForQueue) {
this.maxRequestNumPerBatch = maxRequestNumPerBatch;
this.maxSizePerBatch = maxSizePerBatch;
this.maxPendingBatch = maxPendingBatch;
@@ -229,6 +230,7 @@ public class MultiLeaderConfig {
this.throttleTimeOutMs = throttleTimeOutMs;
this.checkpointGap = checkpointGap;
this.allocateMemoryForConsensus = allocateMemoryForConsensus;
+ this.allocateMemoryForQueue = (long) (allocateMemoryForConsensus * maxMemoryRatioForQueue);
}
public int getMaxRequestNumPerBatch() {
@@ -271,6 +273,10 @@ public class MultiLeaderConfig {
return allocateMemoryForConsensus;
}
+ public long getAllocateMemoryForQueue() {
+ return allocateMemoryForQueue;
+ }
+
public static Replication.Builder newBuilder() {
return new Replication.Builder();
}
@@ -288,6 +294,7 @@ public class MultiLeaderConfig {
private long throttleTimeOutMs = TimeUnit.SECONDS.toMillis(30);
private long checkpointGap = 500;
private long allocateMemoryForConsensus;
+ private double maxMemoryRatioForQueue = 0.6;
public Replication.Builder setMaxRequestNumPerBatch(int maxRequestNumPerBatch) {
this.maxRequestNumPerBatch = maxRequestNumPerBatch;
@@ -351,7 +358,8 @@ public class MultiLeaderConfig {
walThrottleThreshold,
throttleTimeOutMs,
checkpointGap,
- allocateMemoryForConsensus);
+ allocateMemoryForConsensus,
+ maxMemoryRatioForQueue);
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 7a86bdf540..3a6d7c1bf5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -99,7 +99,9 @@ public class MultiLeaderConsensus implements IConsensus {
new SyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
// init multiLeader memory manager
MultiLeaderMemoryManager.getInstance()
- .init(config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus());
+ .init(
+ config.getMultiLeaderConfig().getReplication().getAllocateMemoryForConsensus(),
+ config.getMultiLeaderConfig().getReplication().getAllocateMemoryForQueue());
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 4880e2aae4..e2c44c2570 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -243,7 +243,7 @@ public class LogDispatcher {
/** try to offer a request into queue with memory control */
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
- if (!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize())) {
+ if (!multiLeaderMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) {
return false;
}
boolean success;
@@ -268,12 +268,18 @@ public class LogDispatcher {
public void stop() {
stopped = true;
+ long requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : pendingRequest) {
- multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ requestSize += indexedConsensusRequest.getSerializedSize();
}
+ pendingRequest.clear();
+ multiLeaderMemoryManager.free(requestSize);
+ requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : bufferedRequest) {
- multiLeaderMemoryManager.free(indexedConsensusRequest.getSerializedSize());
+ requestSize += indexedConsensusRequest.getSerializedSize();
}
+ multiLeaderMemoryManager.free(requestSize);
+ syncStatus.free();
MetricService.getInstance().removeMetricSet(metrics);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
index fb64f8a489..296fd1c716 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/MultiLeaderMemoryManager.java
@@ -31,16 +31,19 @@ public class MultiLeaderMemoryManager {
private static final Logger logger = LoggerFactory.getLogger(MultiLeaderMemoryManager.class);
private final AtomicLong memorySizeInByte = new AtomicLong(0);
private Long maxMemorySizeInByte = Runtime.getRuntime().maxMemory() / 10;
+ private Long maxMemorySizeForQueueInByte = Runtime.getRuntime().maxMemory() / 100 * 8;
private MultiLeaderMemoryManager() {
MetricService.getInstance().addMetricSet(new MultiLeaderMemoryManagerMetrics(this));
}
- public boolean reserve(long size) {
+ public boolean reserve(long size, boolean fromQueue) {
AtomicBoolean result = new AtomicBoolean(false);
memorySizeInByte.updateAndGet(
(memorySize) -> {
- if (size > maxMemorySizeInByte - memorySize) {
+ long remainSize =
+ (fromQueue ? maxMemorySizeForQueueInByte : maxMemorySizeInByte) - memorySize;
+ if (memorySize > remainSize) {
logger.debug(
"consensus memory limited. required: {}, used: {}, total: {}",
size,
@@ -70,8 +73,9 @@ public class MultiLeaderMemoryManager {
currentUsedMemory);
}
- public void init(long maxMemorySize) {
+ public void init(long maxMemorySize, long maxMemorySizeForQueue) {
this.maxMemorySizeInByte = maxMemorySize;
+ this.maxMemorySizeForQueueInByte = maxMemorySizeForQueue;
}
long getMemorySizeInByte() {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
index 920e781b09..5d4685b19e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
@@ -87,6 +87,10 @@ public class PendingBatch {
return batches.isEmpty();
}
+ public long getSerializedSize() {
+ return serializedSize;
+ }
+
@Override
public String toString() {
return "PendingBatch{"
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index 3549c1158b..e0ec7d4023 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -31,6 +31,8 @@ public class SyncStatus {
private final MultiLeaderConfig config;
private final IndexController controller;
private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>();
+ private final MultiLeaderMemoryManager multiLeaderMemoryManager =
+ MultiLeaderMemoryManager.getInstance();
public SyncStatus(IndexController controller, MultiLeaderConfig config) {
this.controller = controller;
@@ -40,7 +42,8 @@ public class SyncStatus {
/** we may block here if the synchronization pipeline is full */
public void addNextBatch(PendingBatch batch) throws InterruptedException {
synchronized (this) {
- while (pendingBatches.size() >= config.getReplication().getMaxPendingBatch()) {
+ while (pendingBatches.size() >= config.getReplication().getMaxPendingBatch()
+ || !multiLeaderMemoryManager.reserve(batch.getSerializedSize(), false)) {
wait();
}
pendingBatches.add(batch);
@@ -61,6 +64,7 @@ public class SyncStatus {
while (current.isSynced()) {
controller.updateAndGet(current.getEndIndex());
iterator.remove();
+ multiLeaderMemoryManager.free(current.getSerializedSize());
if (iterator.hasNext()) {
current = iterator.next();
} else {
@@ -73,6 +77,15 @@ public class SyncStatus {
}
}
+ public void free() {
+ long size = 0;
+ for (PendingBatch pendingBatch : pendingBatches) {
+ size = pendingBatch.getSerializedSize();
+ }
+ pendingBatches.clear();
+ multiLeaderMemoryManager.free(size);
+ }
+
/** Gets the first index that is not currently synchronized */
public long getNextSendingIndex() {
// we do not use ReentrantReadWriteLock because there will be only one thread reading this field