You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/03/20 09:16:47 UTC
[iotdb] 01/01: IOTDB-854 Limit the memory foorprint of the
committed log cache
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch limit_memory_for_raftlog
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a02653f8403dfbe75f5d7dbe79ec4fe84265e47f
Author: chaow <xu...@gmail.com>
AuthorDate: Sat Mar 20 16:40:54 2021 +0800
IOTDB-854 Limit the memory foorprint of the committed log cache
---
.../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++++++
.../iotdb/cluster/config/ClusterDescriptor.java | 6 ++++
.../java/org/apache/iotdb/cluster/log/Log.java | 10 +++++++
.../cluster/log/manage/CommittedEntryManager.java | 34 +++++++++++++++++++++
.../iotdb/cluster/log/manage/RaftLogManager.java | 29 ++++++++++++++----
.../iotdb/cluster/server/member/RaftMember.java | 5 ++++
.../org/apache/iotdb/cluster/common/TestUtils.java | 1 +
.../cluster/log/manage/RaftLogManagerTest.java | 35 ++++++++++++++++++++++
8 files changed, 125 insertions(+), 5 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 08f7608..386bb25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -65,6 +65,8 @@ public class ClusterConfig {
/** max number of committed logs in memory */
private int maxNumOfLogsInMem = 1000;
+ private long maxMemorySizeForRaftLog = 536870912;
+
/** deletion check period of the submitted log */
private int logDeleteCheckIntervalSecond = -1;
@@ -381,6 +383,14 @@ public class ClusterConfig {
this.maxRaftLogIndexSizeInMemory = maxRaftLogIndexSizeInMemory;
}
+ public long getMaxMemorySizeForRaftLog() {
+ return maxMemorySizeForRaftLog;
+ }
+
+ public void setMaxMemorySizeForRaftLog(long maxMemorySizeForRaftLog) {
+ this.maxMemorySizeForRaftLog = maxMemorySizeForRaftLog;
+ }
+
public int getMaxRaftLogPersistDataSizePerFile() {
return maxRaftLogPersistDataSizePerFile;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d6bbf82..25f85e0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -200,6 +200,12 @@ public class ClusterDescriptor {
properties.getProperty(
"max_num_of_logs_in_mem", String.valueOf(config.getMaxNumOfLogsInMem()))));
+ config.setMaxMemorySizeForRaftLog(
+ Long.parseLong(
+ properties.getProperty(
+ "max_memory_size_for_raft_log",
+ String.valueOf(config.getMaxMemorySizeForRaftLog()))));
+
config.setLogDeleteCheckIntervalSecond(
Integer.parseInt(
properties.getProperty(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 6977634..d2294eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -45,6 +45,8 @@ public abstract class Log implements Comparable<Log> {
private long createTime;
private long enqueueTime;
+ private int byteSize;
+
public abstract ByteBuffer serialize();
public abstract void deserialize(ByteBuffer buffer);
@@ -132,4 +134,12 @@ public abstract class Log implements Comparable<Log> {
public void setEnqueueTime(long enqueueTime) {
this.enqueueTime = enqueueTime;
}
+
+ public long getByteSize() {
+ return byteSize;
+ }
+
+ public void setByteSize(int byteSize) {
+ this.byteSize = byteSize;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index a5d83a8..4e53e26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -41,6 +41,8 @@ public class CommittedEntryManager {
// memory cache for logs which have been persisted in disk.
private List<Log> entries;
+ private long entryTotalMemSize;
+
/**
* Note that it is better to use applyingSnapshot to update dummy entry immediately after this
* instance is created.
@@ -48,6 +50,7 @@ public class CommittedEntryManager {
CommittedEntryManager(int maxNumOfLogInMem) {
entries = Collections.synchronizedList(new ArrayList<>(maxNumOfLogInMem));
entries.add(new EmptyContentLog(-1, -1));
+ entryTotalMemSize = 0;
}
/**
@@ -209,6 +212,9 @@ public class CommittedEntryManager {
0,
new EmptyContentLog(
entries.get(index).getCurrLogIndex(), entries.get(index).getCurrLogTerm()));
+ for (int i = 1; i <= index; i++) {
+ entryTotalMemSize -= entries.get(i).getByteSize();
+ }
entries.subList(1, index + 1).clear();
}
@@ -225,6 +231,9 @@ public class CommittedEntryManager {
}
long offset = appendingEntries.get(0).getCurrLogIndex() - getDummyIndex();
if (entries.size() - offset == 0) {
+ for (int i = 0; i < appendingEntries.size(); i++) {
+ entryTotalMemSize += appendingEntries.get(i).getByteSize();
+ }
entries.addAll(appendingEntries);
} else if (entries.size() - offset > 0) {
throw new TruncateCommittedEntryException(
@@ -246,4 +255,29 @@ public class CommittedEntryManager {
List<Log> getAllEntries() {
return entries;
}
+
+ public long getEntryTotalMemSize() {
+ return entryTotalMemSize;
+ }
+
+ public void setEntryTotalMemSize(long entryTotalMemSize) {
+ this.entryTotalMemSize = entryTotalMemSize;
+ }
+
+ /**
+ * check how many logs could be reserved in memory.
+ *
+ * @param maxMemSize the max memory size for old committed log
+ * @return max num to reserve old committed log
+ */
+ public int maxLogNumShouldReserve(long maxMemSize) {
+ long totalSize = 0;
+ for (int i = entries.size() - 1; i >= 1; i--) {
+ if (totalSize + entries.get(i).getByteSize() > maxMemSize) {
+ return entries.size() - 1 - i;
+ }
+ totalSize += entries.get(i).getByteSize();
+ }
+ return entries.size();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 86b2d33..fce98bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -100,6 +100,9 @@ public abstract class RaftLogManager {
private int maxNumOfLogsInMem =
ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+ private long maxLogMemSize =
+ ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog();
+
/**
* Each time new logs are appended, this condition will be notified so logs that have larger
* indices but arrived earlier can proceed.
@@ -582,14 +585,30 @@ public abstract class RaftLogManager {
.clear();
}
try {
- int currentSize = committedEntryManager.getTotalSize();
- int deltaSize = entries.size();
- if (currentSize + deltaSize > maxNumOfLogsInMem) {
- int sizeToReserveForNew = maxNumOfLogsInMem - deltaSize;
+ boolean needCompactedLog = false;
+ int numToReserveForNew = minNumOfLogsInMem;
+ if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
+ needCompactedLog = true;
+ numToReserveForNew = maxNumOfLogsInMem - entries.size();
+ }
+
+ long newEntryMemSize = 0;
+ for (Log entry : entries) {
+ newEntryMemSize += entry.getByteSize();
+ }
+ int sizeToReserveForNew = minNumOfLogsInMem;
+ if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) {
+ needCompactedLog = true;
+ sizeToReserveForNew =
+ committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
+ }
+
+ if (needCompactedLog) {
+ int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
int sizeToReserveForConfig = minNumOfLogsInMem;
startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
synchronized (this) {
- innerDeleteLog(Math.min(sizeToReserveForConfig, sizeToReserveForNew));
+ innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
}
Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 8783e6c..d128e56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -508,7 +508,9 @@ public abstract class RaftMember {
}
long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
+ int logByteSize = request.getEntry().length;
Log log = LogParser.getINSTANCE().parse(request.entry);
+ log.setByteSize(logByteSize);
Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
@@ -529,12 +531,15 @@ public abstract class RaftMember {
long response;
List<Log> logs = new ArrayList<>();
+ int logByteSize = 0;
long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
for (ByteBuffer buffer : request.getEntries()) {
buffer.mark();
Log log;
+ logByteSize = buffer.limit() - buffer.position();
try {
log = LogParser.getINSTANCE().parse(buffer);
+ log.setByteSize(logByteSize);
} catch (BufferUnderflowException e) {
buffer.reset();
throw e;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 53becb4..d8aebfc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -126,6 +126,7 @@ public class TestUtils {
Log log = new LargeTestLog();
log.setCurrLogIndex(i);
log.setCurrLogTerm(i);
+ log.setByteSize(8192);
logList.add(log);
}
return logList;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
index 2eddaf8..4d47219 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
@@ -973,6 +973,41 @@ public class RaftLogManagerTest {
}
@Test
+ public void testInnerDeleteLogsWithLargeLog() {
+ long maxMemSize = ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog();
+ int minNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMinNumOfLogsInMem();
+ int maxNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+ ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(10);
+ ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(10);
+ ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(1024 * 56);
+ CommittedEntryManager committedEntryManager =
+ new CommittedEntryManager(
+ ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+ RaftLogManager instance =
+ new TestRaftLogManager(
+ committedEntryManager, new SyncLogDequeSerializer(testIdentifier), logApplier);
+ List<Log> logs = TestUtils.prepareLargeTestLogs(12);
+
+ try {
+ instance.append(logs.subList(0, 7));
+ instance.maybeCommit(6, 6);
+ while (instance.getMaxHaveAppliedCommitIndex() < 6) {
+ // wait
+ }
+ instance.append(logs.subList(7, 12));
+ instance.maybeCommit(11, 11);
+
+ List<Log> entries = instance.getEntries(0, 12);
+ assertEquals(logs.subList(5, 12), entries);
+ } finally {
+ instance.close();
+ ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(maxNumOfLogsInMem);
+ ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(minNumOfLogsInMem);
+ ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(maxMemSize);
+ }
+ }
+
+ @Test
@SuppressWarnings("java:S2925")
public void testReapplyBlockedLogs() throws LogExecutionException, InterruptedException {
CommittedEntryManager committedEntryManager =