You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/03/20 09:16:46 UTC

[iotdb] branch limit_memory_for_raftlog created (now a02653f)

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a change to branch limit_memory_for_raftlog
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at a02653f  IOTDB-854  Limit the memory foorprint of the committed log cache

This branch includes the following new commits:

     new a02653f  IOTDB-854  Limit the memory foorprint of the committed log cache

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: IOTDB-854 Limit the memory foorprint of the committed log cache

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch limit_memory_for_raftlog
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a02653f8403dfbe75f5d7dbe79ec4fe84265e47f
Author: chaow <xu...@gmail.com>
AuthorDate: Sat Mar 20 16:40:54 2021 +0800

    IOTDB-854  Limit the memory foorprint of the committed log cache
---
 .../apache/iotdb/cluster/config/ClusterConfig.java | 10 +++++++
 .../iotdb/cluster/config/ClusterDescriptor.java    |  6 ++++
 .../java/org/apache/iotdb/cluster/log/Log.java     | 10 +++++++
 .../cluster/log/manage/CommittedEntryManager.java  | 34 +++++++++++++++++++++
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 29 ++++++++++++++----
 .../iotdb/cluster/server/member/RaftMember.java    |  5 ++++
 .../org/apache/iotdb/cluster/common/TestUtils.java |  1 +
 .../cluster/log/manage/RaftLogManagerTest.java     | 35 ++++++++++++++++++++++
 8 files changed, 125 insertions(+), 5 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 08f7608..386bb25 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -65,6 +65,8 @@ public class ClusterConfig {
   /** max number of committed logs in memory */
   private int maxNumOfLogsInMem = 1000;
 
+  private long maxMemorySizeForRaftLog = 536870912;
+
   /** deletion check period of the submitted log */
   private int logDeleteCheckIntervalSecond = -1;
 
@@ -381,6 +383,14 @@ public class ClusterConfig {
     this.maxRaftLogIndexSizeInMemory = maxRaftLogIndexSizeInMemory;
   }
 
+  public long getMaxMemorySizeForRaftLog() {
+    return maxMemorySizeForRaftLog;
+  }
+
+  public void setMaxMemorySizeForRaftLog(long maxMemorySizeForRaftLog) {
+    this.maxMemorySizeForRaftLog = maxMemorySizeForRaftLog;
+  }
+
   public int getMaxRaftLogPersistDataSizePerFile() {
     return maxRaftLogPersistDataSizePerFile;
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index d6bbf82..25f85e0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -200,6 +200,12 @@ public class ClusterDescriptor {
             properties.getProperty(
                 "max_num_of_logs_in_mem", String.valueOf(config.getMaxNumOfLogsInMem()))));
 
+    config.setMaxMemorySizeForRaftLog(
+        Long.parseLong(
+            properties.getProperty(
+                "max_memory_size_for_raft_log",
+                String.valueOf(config.getMaxMemorySizeForRaftLog()))));
+
     config.setLogDeleteCheckIntervalSecond(
         Integer.parseInt(
             properties.getProperty(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
index 6977634..d2294eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java
@@ -45,6 +45,8 @@ public abstract class Log implements Comparable<Log> {
   private long createTime;
   private long enqueueTime;
 
+  private int byteSize;
+
   public abstract ByteBuffer serialize();
 
   public abstract void deserialize(ByteBuffer buffer);
@@ -132,4 +134,12 @@ public abstract class Log implements Comparable<Log> {
   public void setEnqueueTime(long enqueueTime) {
     this.enqueueTime = enqueueTime;
   }
+
+  public long getByteSize() {
+    return byteSize;
+  }
+
+  public void setByteSize(int byteSize) {
+    this.byteSize = byteSize;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
index a5d83a8..4e53e26 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/CommittedEntryManager.java
@@ -41,6 +41,8 @@ public class CommittedEntryManager {
   // memory cache for logs which have been persisted in disk.
   private List<Log> entries;
 
+  private long entryTotalMemSize;
+
   /**
    * Note that it is better to use applyingSnapshot to update dummy entry immediately after this
    * instance is created.
@@ -48,6 +50,7 @@ public class CommittedEntryManager {
   CommittedEntryManager(int maxNumOfLogInMem) {
     entries = Collections.synchronizedList(new ArrayList<>(maxNumOfLogInMem));
     entries.add(new EmptyContentLog(-1, -1));
+    entryTotalMemSize = 0;
   }
 
   /**
@@ -209,6 +212,9 @@ public class CommittedEntryManager {
         0,
         new EmptyContentLog(
             entries.get(index).getCurrLogIndex(), entries.get(index).getCurrLogTerm()));
+    for (int i = 1; i <= index; i++) {
+      entryTotalMemSize -= entries.get(i).getByteSize();
+    }
     entries.subList(1, index + 1).clear();
   }
 
@@ -225,6 +231,9 @@ public class CommittedEntryManager {
     }
     long offset = appendingEntries.get(0).getCurrLogIndex() - getDummyIndex();
     if (entries.size() - offset == 0) {
+      for (int i = 0; i < appendingEntries.size(); i++) {
+        entryTotalMemSize += appendingEntries.get(i).getByteSize();
+      }
       entries.addAll(appendingEntries);
     } else if (entries.size() - offset > 0) {
       throw new TruncateCommittedEntryException(
@@ -246,4 +255,29 @@ public class CommittedEntryManager {
   List<Log> getAllEntries() {
     return entries;
   }
+
+  public long getEntryTotalMemSize() {
+    return entryTotalMemSize;
+  }
+
+  public void setEntryTotalMemSize(long entryTotalMemSize) {
+    this.entryTotalMemSize = entryTotalMemSize;
+  }
+
+  /**
+   * check how many logs could be reserved in memory.
+   *
+   * @param maxMemSize the max memory size for old committed log
+   * @return max num to reserve old committed log
+   */
+  public int maxLogNumShouldReserve(long maxMemSize) {
+    long totalSize = 0;
+    for (int i = entries.size() - 1; i >= 1; i--) {
+      if (totalSize + entries.get(i).getByteSize() > maxMemSize) {
+        return entries.size() - 1 - i;
+      }
+      totalSize += entries.get(i).getByteSize();
+    }
+    return entries.size();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 86b2d33..fce98bc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -100,6 +100,9 @@ public abstract class RaftLogManager {
   private int maxNumOfLogsInMem =
       ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
 
+  private long maxLogMemSize =
+      ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog();
+
   /**
    * Each time new logs are appended, this condition will be notified so logs that have larger
    * indices but arrived earlier can proceed.
@@ -582,14 +585,30 @@ public abstract class RaftLogManager {
           .clear();
     }
     try {
-      int currentSize = committedEntryManager.getTotalSize();
-      int deltaSize = entries.size();
-      if (currentSize + deltaSize > maxNumOfLogsInMem) {
-        int sizeToReserveForNew = maxNumOfLogsInMem - deltaSize;
+      boolean needCompactedLog = false;
+      int numToReserveForNew = minNumOfLogsInMem;
+      if (committedEntryManager.getTotalSize() + entries.size() > maxNumOfLogsInMem) {
+        needCompactedLog = true;
+        numToReserveForNew = maxNumOfLogsInMem - entries.size();
+      }
+
+      long newEntryMemSize = 0;
+      for (Log entry : entries) {
+        newEntryMemSize += entry.getByteSize();
+      }
+      int sizeToReserveForNew = minNumOfLogsInMem;
+      if (newEntryMemSize + committedEntryManager.getEntryTotalMemSize() > maxLogMemSize) {
+        needCompactedLog = true;
+        sizeToReserveForNew =
+            committedEntryManager.maxLogNumShouldReserve(maxLogMemSize - newEntryMemSize);
+      }
+
+      if (needCompactedLog) {
+        int numForNew = Math.min(numToReserveForNew, sizeToReserveForNew);
         int sizeToReserveForConfig = minNumOfLogsInMem;
         startTime = Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.getOperationStartTime();
         synchronized (this) {
-          innerDeleteLog(Math.min(sizeToReserveForConfig, sizeToReserveForNew));
+          innerDeleteLog(Math.min(sizeToReserveForConfig, numForNew));
         }
         Statistic.RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS.calOperationCostTimeFromStart(startTime);
       }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 8783e6c..d128e56 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -508,7 +508,9 @@ public abstract class RaftMember {
     }
 
     long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
+    int logByteSize = request.getEntry().length;
     Log log = LogParser.getINSTANCE().parse(request.entry);
+    log.setByteSize(logByteSize);
     Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.calOperationCostTimeFromStart(startTime);
 
     long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
@@ -529,12 +531,15 @@ public abstract class RaftMember {
 
     long response;
     List<Log> logs = new ArrayList<>();
+    int logByteSize = 0;
     long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
     for (ByteBuffer buffer : request.getEntries()) {
       buffer.mark();
       Log log;
+      logByteSize = buffer.limit() - buffer.position();
       try {
         log = LogParser.getINSTANCE().parse(buffer);
+        log.setByteSize(logByteSize);
       } catch (BufferUnderflowException e) {
         buffer.reset();
         throw e;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
index 53becb4..d8aebfc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java
@@ -126,6 +126,7 @@ public class TestUtils {
       Log log = new LargeTestLog();
       log.setCurrLogIndex(i);
       log.setCurrLogTerm(i);
+      log.setByteSize(8192);
       logList.add(log);
     }
     return logList;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
index 2eddaf8..4d47219 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/manage/RaftLogManagerTest.java
@@ -973,6 +973,41 @@ public class RaftLogManagerTest {
   }
 
   @Test
+  public void testInnerDeleteLogsWithLargeLog() {
+    long maxMemSize = ClusterDescriptor.getInstance().getConfig().getMaxMemorySizeForRaftLog();
+    int minNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMinNumOfLogsInMem();
+    int maxNumOfLogsInMem = ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem();
+    ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(10);
+    ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(10);
+    ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(1024 * 56);
+    CommittedEntryManager committedEntryManager =
+        new CommittedEntryManager(
+            ClusterDescriptor.getInstance().getConfig().getMaxNumOfLogsInMem());
+    RaftLogManager instance =
+        new TestRaftLogManager(
+            committedEntryManager, new SyncLogDequeSerializer(testIdentifier), logApplier);
+    List<Log> logs = TestUtils.prepareLargeTestLogs(12);
+
+    try {
+      instance.append(logs.subList(0, 7));
+      instance.maybeCommit(6, 6);
+      while (instance.getMaxHaveAppliedCommitIndex() < 6) {
+        // wait
+      }
+      instance.append(logs.subList(7, 12));
+      instance.maybeCommit(11, 11);
+
+      List<Log> entries = instance.getEntries(0, 12);
+      assertEquals(logs.subList(5, 12), entries);
+    } finally {
+      instance.close();
+      ClusterDescriptor.getInstance().getConfig().setMaxNumOfLogsInMem(maxNumOfLogsInMem);
+      ClusterDescriptor.getInstance().getConfig().setMinNumOfLogsInMem(minNumOfLogsInMem);
+      ClusterDescriptor.getInstance().getConfig().setMaxMemorySizeForRaftLog(maxMemSize);
+    }
+  }
+
+  @Test
   @SuppressWarnings("java:S2925")
   public void testReapplyBlockedLogs() throws LogExecutionException, InterruptedException {
     CommittedEntryManager committedEntryManager =