You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/09 09:21:11 UTC
[iotdb] branch master updated: [IOTDB-5510] Block and wait to create timeseries when releasing memory (#9030)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 61ef21ae22 [IOTDB-5510] Block and wait to create timeseries when releasing memory (#9030)
61ef21ae22 is described below
commit 61ef21ae2224b6ec5d57e0148a452a7d43577b11
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Thu Feb 9 17:21:03 2023 +0800
[IOTDB-5510] Block and wait to create timeseries when releasing memory (#9030)
---
.../mtree/store/disk/cache/CacheMemoryManager.java | 47 ++++++++++++++++++++--
.../schemaregion/SchemaRegionSchemaFileImpl.java | 13 ++++--
2 files changed, 53 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
index 27d71141e4..6ba8d98436 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
@@ -56,6 +56,15 @@ public class CacheMemoryManager {
private volatile boolean hasReleaseTask;
private int releaseCount = 0;
+ private static final int MAX_WAITING_TIME_WHEN_RELEASING = 10_000;
+ private final Object blockObject = new Object();
+
+ /**
+ * Create and allocate LRUCacheManager to the corresponding CachedMTreeStore.
+ *
+ * @param store CachedMTreeStore
+ * @return LRUCacheManager
+ */
public ICacheManager createLRUCacheManager(CachedMTreeStore store) {
synchronized (storeList) {
ICacheManager cacheManager = new LRUCacheManager();
@@ -73,12 +82,35 @@ public class CacheMemoryManager {
CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_POOL.getName());
}
+ /**
+ * Check the current memory usage. If the release threshold is exceeded, trigger the task to
+ * perform an internal and external memory swap to release the memory.
+ */
public void ensureMemoryStatus() {
if (memManager.isExceedReleaseThreshold() && !hasReleaseTask) {
registerReleaseTask();
}
}
+ /**
+ * If there is a ReleaseTask or FlushTask, block the current thread to wait up to
+ * MAX_WAITING_TIME_WHEN_RELEASING. The thread will be woken up if the ReleaseTask or FlushTask
+ * ends or the wait time exceeds MAX_WAITING_TIME_WHEN_RELEASING.
+ */
+ public void waitIfReleasing() {
+ synchronized (blockObject) {
+ if (hasReleaseTask || hasFlushTask) {
+ try {
+ blockObject.wait(MAX_WAITING_TIME_WHEN_RELEASING);
+ } catch (InterruptedException e) {
+ logger.warn(
+ "Interrupt because the release task and flush task did not finish within {} milliseconds.",
+ MAX_WAITING_TIME_WHEN_RELEASING);
+ }
+ }
+ }
+ }
+
private synchronized void registerReleaseTask() {
if (hasReleaseTask) {
return;
@@ -120,9 +152,13 @@ public class CacheMemoryManager {
.toArray(CompletableFuture[]::new))
.join();
releaseCount++;
- hasReleaseTask = false;
- if (memManager.isExceedFlushThreshold() && !hasFlushTask) {
- registerFlushTask();
+ synchronized (blockObject) {
+ hasReleaseTask = false;
+ if (memManager.isExceedFlushThreshold() && !hasFlushTask) {
+ registerFlushTask();
+ } else {
+ blockObject.notifyAll();
+ }
}
}
}
@@ -164,8 +200,11 @@ public class CacheMemoryManager {
flushTaskExecutor))
.toArray(CompletableFuture[]::new))
.join();
- hasFlushTask = false;
flushCount++;
+ synchronized (blockObject) {
+ hasFlushTask = false;
+ blockObject.notifyAll();
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
index 7c66cca2c3..968680b455 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.metadata.logfile.SchemaLogWriter;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl;
+import org.apache.iotdb.db.metadata.mtree.store.disk.cache.CacheMemoryManager;
import org.apache.iotdb.db.metadata.plan.schemaregion.ISchemaRegionPlan;
import org.apache.iotdb.db.metadata.plan.schemaregion.SchemaRegionPlanVisitor;
import org.apache.iotdb.db.metadata.plan.schemaregion.impl.SchemaRegionPlanDeserializer;
@@ -520,8 +521,11 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void createTimeseries(ICreateTimeSeriesPlan plan, long offset) throws MetadataException {
if (!memoryStatistics.isAllowToCreateNewSeries()) {
- logger.error("Series overflow when creating: [{}]", plan.getPath().getFullPath());
- throw new SeriesOverflowException();
+ CacheMemoryManager.getInstance().waitIfReleasing();
+ if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ logger.warn("Series overflow when creating: [{}]", plan.getPath().getFullPath());
+ throw new SeriesOverflowException();
+ }
}
if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(1)) {
@@ -640,7 +644,10 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion {
public void createAlignedTimeSeries(ICreateAlignedTimeSeriesPlan plan) throws MetadataException {
int seriesCount = plan.getMeasurements().size();
if (!memoryStatistics.isAllowToCreateNewSeries()) {
- throw new SeriesOverflowException();
+ CacheMemoryManager.getInstance().waitIfReleasing();
+ if (!memoryStatistics.isAllowToCreateNewSeries()) {
+ throw new SeriesOverflowException();
+ }
}
if (seriesNumerMonitor != null && !seriesNumerMonitor.addTimeSeries(seriesCount)) {