You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/09 09:21:11 UTC

[iotdb] branch master updated: [IOTDB-5510] Block and wait to create timeseries when releasing memory (#9030)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 61ef21ae22 [IOTDB-5510] Block and wait to create timeseries when releasing memory (#9030)
61ef21ae22 is described below

commit 61ef21ae2224b6ec5d57e0148a452a7d43577b11
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Feb 9 17:21:03 2023 +0800

    [IOTDB-5510] Block and wait to create timeseries when releasing memory (#9030)
---
 .../mtree/store/disk/cache/CacheMemoryManager.java | 47 ++++++++++++++++++++--
 .../schemaregion/SchemaRegionSchemaFileImpl.java   | 13 ++++--
 2 files changed, 53 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
index 27d71141e4..6ba8d98436 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
@@ -56,6 +56,15 @@ public class CacheMemoryManager {
   private volatile boolean hasReleaseTask;
   private int releaseCount = 0;
 
+  private static final int MAX_WAITING_TIME_WHEN_RELEASING = 10_000;
+  private final Object blockObject = new Object();
+
+  /**
+   * Create and allocate LRUCacheManager to the corresponding CachedMTreeStore.
+   *
+   * @param store CachedMTreeStore
+   * @return LRUCacheManager
+   */
   public ICacheManager createLRUCacheManager(CachedMTreeStore store) {
     synchronized (storeList) {
       ICacheManager cacheManager = new LRUCacheManager();
@@ -73,12 +82,35 @@ public class CacheMemoryManager {
             CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_POOL.getName());
   }
 
+  /**
+   * Check the current memory usage. If the release threshold is exceeded, trigger the task to
+   * perform an internal and external memory swap to release the memory.
+   */
   public void ensureMemoryStatus() {
     if (memManager.isExceedReleaseThreshold() && !hasReleaseTask) {
       registerReleaseTask();
     }
   }
 
+  /**
+   * If there is a ReleaseTask or FlushTask, block the current thread to wait up to
+   * MAX_WAITING_TIME_WHEN_RELEASING. The thread will be woken up if the ReleaseTask or FlushTask
+   * ends or the wait time exceeds MAX_WAITING_TIME_WHEN_RELEASING.
+   */
+  public void waitIfReleasing() {
+    synchronized (blockObject) {
+      if (hasReleaseTask || hasFlushTask) {
+        try {
+          blockObject.wait(MAX_WAITING_TIME_WHEN_RELEASING);
+        } catch (InterruptedException e) {
+          logger.warn(
+              "Interrupt because the release task and flush task did not finish within {} milliseconds.",
+              MAX_WAITING_TIME_WHEN_RELEASING);
+        }
+      }
+    }
+  }
+
   private synchronized void registerReleaseTask() {
     if (hasReleaseTask) {
       return;
@@ -120,9 +152,13 @@ public class CacheMemoryManager {
                   .toArray(CompletableFuture[]::new))
           .join();
       releaseCount++;
-      hasReleaseTask = false;
-      if (memManager.isExceedFlushThreshold() && !hasFlushTask) {
-        registerFlushTask();
+      synchronized (blockObject) {
+        hasReleaseTask = false;
+        if (memManager.isExceedFlushThreshold() && !hasFlushTask) {
+          registerFlushTask();
+        } else {
+          blockObject.notifyAll();
+        }
       }
     }
   }
@@ -164,8 +200,11 @@ public class CacheMemoryManager {
                               flushTaskExecutor))
                   .toArray(CompletableFuture[]::new))
           .join();
-      hasFlushTask = false;
       flushCount++;
+      synchronized (blockObject) {
+        hasFlushTask = false;
+        blockObject.notifyAll();
+      }
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 7c66cca2c3..968680b455 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.metadata.logfile.SchemaLogWriter;
 import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
 import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
+import org.apache.iotdb.db.metadata.mtree.store.disk.cache.CacheMemoryManager;
 import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
 import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
 import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
@@ -520,8 +521,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
-      logger.error("Series overflow when creating: [{}]", plan.getPath().getFullPath());
-      throw new SeriesOverflowException();
+      CacheMemoryManager.getInstance().waitIfReleasing();
+      if (!memoryStatistics.isAllowToCreateNewSeries()) {
+        logger.warn("Series overflow when creating: [{}]", plan.getPath().getFullPath());
+        throw new SeriesOverflowException();
+      }
     }
 
     if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(1)) {
@@ -640,7 +644,10 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
   public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
     int seriesCount = plan.getMeasurements().size();
     if (!memoryStatistics.isAllowToCreateNewSeries()) {
-      throw new SeriesOverflowException();
+      CacheMemoryManager.getInstance().waitIfReleasing();
+      if (!memoryStatistics.isAllowToCreateNewSeries()) {
+        throw new SeriesOverflowException();
+      }
     }
 
     if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(seriesCount)) {