You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/03/20 09:16:47 UTC

[iotdb] 01/01: IOTDB-854 Limit the memory foorprint of the committed log cache

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch limit_memory_for_raftlog
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a02653f8403dfbe75f5d7dbe79ec4fe84265e47f
Author: chaow <xu...@gmail.com>
AuthorDate: Sat Mar 20 16:40:54 2021 +0800

    IOTDB-854  Limit the memory foorprint of the committed log cache
---
 .../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++++++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  6 ++++
 .../java/org/apache/iotdb/cluster/log/Log.java     | 10 +++++++
 .../cluster/log/manage/CommittedEntryManager.java  | 34 +++++++++++++++++++++
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 29 ++++++++++++++----
 .../iotdb/cluster/server/member/RaftMember.java    |  5 ++++
 .../org/apache/iotdb/cluster/common/TestUtils.java |  1 +
 .../cluster/log/manage/RaftLogManagerTest.java     | 35 ++++++++++++++++++++++
 8 files changed, 125 insertions(+), 5 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 08f7608..386bb25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -65,6 +65,8 @@ public class ClusterConfig {
   /** max number of committed logs in memory */
   private int maxNumOfLogsInMem = 1000;
 
+  private long maxMemorySizeForRaftLog = 536870912;
+
   /** deletion check period of the submitted log */
   private int logDeleteCheckIntervalSecond = -1;
 
@@ -381,6 +383,14 @@ public class ClusterConfig {
     this.maxRaftLogIndexSizeInMemory = maxRaftLogIndexSizeInMemory;
   }
 
+  public long getMaxMemorySizeForRaftLog() {
+    return maxMemorySizeForRaftLog;
+  }
+
+  public void setMaxMemorySizeForRaftLog(long maxMemorySizeForRaftLog) {
+    this.maxMemorySizeForRaftLog = maxMemorySizeForRaftLog;
+  }
+
   public int getMaxRaftLogPersistDataSizePerFile() {
     return maxRaftLogPersistDataSizePerFile;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d6bbf82..25f85e0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -200,6 +200,12 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "max_num_of_logs_in_mem", String.valueOf(config.getMaxNumOfLogsInMem()))));
 
+    config.setMaxMemorySizeForRaftLog(
+        Long.parseLong(
+            properties.getProperty(
+                "max_memory_size_for_raft_log",
+                String.valueOf(config.getMaxMemorySizeForRaftLog()))));
+
     config.setLogDeleteCheckIntervalSecond(
         Integer.parseInt(
             properties.getProperty(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 6977634..d2294eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -45,6 +45,8 @@ public abstract class Log implements Comparable<Log> {
   private long createTime;
   private long enqueueTime;
 
+  private int byteSize;
+
   public abstract ByteBuffer serialize();
 
   public abstract void deserialize(ByteBuffer buffer);
@@ -132,4 +134,12 @@ public abstract class Log implements Comparable<Log> {
   public void setEnqueueTime(long enqueueTime) {
     this.enqueueTime = enqueueTime;
   }
+
+  public long getByteSize() {
+    return byteSize;
+  }
+
+  public void setByteSize(int byteSize) {
+    this.byteSize = byteSize;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index a5d83a8..4e53e26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -41,6 +41,8 @@ public class CommittedEntryManager {
   // memory cache for logs which have been persisted in disk.
   private List<Log> entries;
 
+  private long entryTotalMemSize;
+
   /**
    * Note that it is better to use applyingSnapshot to update dummy entry immediately after this
    * instance is created.
@@ -48,6 +50,7 @@ public class CommittedEntryManager {
   CommittedEntryManager(int maxNumOfLogInMem) {
     entries = Collections.synchronizedList(new ArrayList<>(maxNumOfLogInMem));
     entries.add(new EmptyContentLog(-1, -1));
+    entryTotalMemSize = 0;
   }
 
   /**
@@ -209,6 +212,9 @@ public class CommittedEntryManager {
         0,
         new EmptyContentLog(
             entries.get(index).getCurrLogIndex(), entries.get(index).getCurrLogTerm()));
+    for (int i = 1; i <= index; i++) {
+      entryTotalMemSize -= entries.get(i).getByteSize();
+    }
     entries.subList(1, index + 1).clear();
   }
 
@@ -225,6 +231,9 @@ public class CommittedEntryManager {
     }
     long offset = appendingEntries.get(0).getCurrLogIndex() - getDummyIndex();
     if (entries.size() - offset == 0) {
+      for (int i = 0; i < appendingEntries.size(); i++) {
+        entryTotalMemSize += appendingEntries.get(i).getByteSize();
+      }
       entries.addAll(appendingEntries);
     } else if (entries.size() - offset > 0) {
       throw new TruncateCommittedEntryException(
@@ -246,4 +255,29 @@ public class CommittedEntryManager {
   List<Log> getAllEntries() {
     return entries;
   }
+
+  public long getEntryTotalMemSize() {
+    return entryTotalMemSize;
+  }
+
+  public void setEntryTotalMemSize(long entryTotalMemSize) {
+    this.entryTotalMemSize = entryTotalMemSize;
+  }
+
+  /**
+   * check how many logs could be reserved in memory.
+   *
+   * @param maxMemSize the max memory size for old committed log
+   * @return max num to reserve old committed log
+   */
+  public int maxLogNumShouldReserve(long maxMemSize) {
+    long totalSize = 0;
+    for (int i = entries.size() - 1; i >= 1; i--) {
+      if (totalSize + entries.get(i).getByteSize() > maxMemSize) {
+        return entries.size() - 1 - i;
+      }
+      totalSize += entries.get(i).getByteSize();
+    }
+    return entries.size();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 86b2d33..fce98bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -100,6 +100,9 @@ public abstract class RaftLogManager {
   private int maxNumOfLogsInMem =
       ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
 
+  private long maxLogMemSize =
+      ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog();
+
   /**
    * Each time new logs are appended, this condition will be notified so logs that have larger
    * indices but arrived earlier can proceed.
@@ -582,14 +585,30 @@ public abstract class RaftLogManager {
           .clear();
     }
     try {
-      int currentSize = committedEntryManager.getTotalSize();
-      int deltaSize = entries.size();
-      if (currentSize + deltaSize > maxNumOfLogsInMem) {
-        int sizeToReserveForNew = maxNumOfLogsInMem - deltaSize;
+      boolean needCompactedLog = false;
+      int numToReserveForNew = minNumOfLogsInMem;
+      if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
+        needCompactedLog = true;
+        numToReserveForNew = maxNumOfLogsInMem - entries.size();
+      }
+
+      long newEntryMemSize = 0;
+      for (Log entry : entries) {
+        newEntryMemSize += entry.getByteSize();
+      }
+      int sizeToReserveForNew = minNumOfLogsInMem;
+      if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) {
+        needCompactedLog = true;
+        sizeToReserveForNew =
+            committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
+      }
+
+      if (needCompactedLog) {
+        int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
         int sizeToReserveForConfig = minNumOfLogsInMem;
         startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
         synchronized (this) {
-          innerDeleteLog(Math.min(sizeToReserveForConfig, sizeToReserveForNew));
+          innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
         }
         Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 8783e6c..d128e56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -508,7 +508,9 @@ public abstract class RaftMember {
     }
 
     long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
+    int logByteSize = request.getEntry().length;
     Log log = LogParser.getINSTANCE().parse(request.entry);
+    log.setByteSize(logByteSize);
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
     long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
@@ -529,12 +531,15 @@ public abstract class RaftMember {
 
     long response;
     List<Log> logs = new ArrayList<>();
+    int logByteSize = 0;
     long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
     for (ByteBuffer buffer : request.getEntries()) {
       buffer.mark();
       Log log;
+      logByteSize = buffer.limit() - buffer.position();
       try {
         log = LogParser.getINSTANCE().parse(buffer);
+        log.setByteSize(logByteSize);
       } catch (BufferUnderflowException e) {
         buffer.reset();
         throw e;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 53becb4..d8aebfc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -126,6 +126,7 @@ public class TestUtils {
       Log log = new LargeTestLog();
       log.setCurrLogIndex(i);
       log.setCurrLogTerm(i);
+      log.setByteSize(8192);
       logList.add(log);
     }
     return logList;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
index 2eddaf8..4d47219 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
@@ -973,6 +973,41 @@ public class RaftLogManagerTest {
   }
 
   @Test
+  public void testInnerDeleteLogsWithLargeLog() {
+    long maxMemSize = ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog();
+    int minNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMinNumOfLogsInMem();
+    int maxNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+    ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(10);
+    ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(10);
+    ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(1024 * 56);
+    CommittedEntryManager committedEntryManager =
+        new CommittedEntryManager(
+            ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+    RaftLogManager instance =
+        new TestRaftLogManager(
+            committedEntryManager, new SyncLogDequeSerializer(testIdentifier), logApplier);
+    List<Log> logs = TestUtils.prepareLargeTestLogs(12);
+
+    try {
+      instance.append(logs.subList(0, 7));
+      instance.maybeCommit(6, 6);
+      while (instance.getMaxHaveAppliedCommitIndex() < 6) {
+        // wait
+      }
+      instance.append(logs.subList(7, 12));
+      instance.maybeCommit(11, 11);
+
+      List<Log> entries = instance.getEntries(0, 12);
+      assertEquals(logs.subList(5, 12), entries);
+    } finally {
+      instance.close();
+      ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(maxNumOfLogsInMem);
+      ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(minNumOfLogsInMem);
+      ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(maxMemSize);
+    }
+  }
+
+  @Test
   @SuppressWarnings("java:S2925")
   public void testReapplyBlockedLogs() throws LogExecutionException, InterruptedException {
     CommittedEntryManager committedEntryManager =