You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/17 13:21:04 UTC
[iotdb] branch master updated: [IOTDB-5549] Ensure concurrently MNode memory release (#9089)
This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 20a82d8a10 [IOTDB-5549] Ensure concurrently MNode memory release (#9089)
20a82d8a10 is described below
commit 20a82d8a10646700dbfb03008b1fae9a6ec9909f
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Feb 17 21:20:56 2023 +0800
[IOTDB-5549] Ensure concurrently MNode memory release (#9089)
---
.../iotdb/commons/concurrent/ThreadName.java | 6 +-
.../mtree/store/disk/cache/CacheMemoryManager.java | 132 +++++++++++++--------
.../iotdb/db/utils/concurrent/FiniteSemaphore.java | 56 +++++++++
.../schemaRegion/SchemaStatisticsTest.java | 3 -
4 files changed, 144 insertions(+), 53 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 588d473841..9b33ae30ee 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -62,8 +62,10 @@ public enum ThreadName {
ASYNC_CONFIGNODE_CLIENT_POOL("AsyncConfigNodeIServiceClientPool"),
ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
- SCHEMA_REGION_RELEASE_POOL("SchemaRegion-Release-Task"),
- SCHEMA_REGION_FLUSH_POOL("SchemaRegion-Flush-Task");
+ SCHEMA_REGION_RELEASE_PROCESSOR("SchemaRegion-Release-Task-Processor"),
+ SCHEMA_RELEASE_MONITOR("Schema-Release-Task-Monitor"),
+ SCHEMA_REGION_FLUSH_PROCESSOR("SchemaRegion-Flush-Task-Processor"),
+ SCHEMA_FLUSH_MONITOR("Schema-Flush-Task-Monitor");
private final String name;
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
index 8ac11408b4..a4d13e6153 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.ReleaseFlushStra
import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.ReleaseFlushStrategySizeBasedImpl;
import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
import org.apache.iotdb.db.metadata.rescon.SchemaEngineStatisticsHolder;
+import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,8 +53,13 @@ public class CacheMemoryManager {
private static final int CONCURRENT_NUM = 10;
- private ExecutorService flushTaskExecutor;
- private ExecutorService releaseTaskExecutor;
+ private ExecutorService flushTaskProcessor;
+ private ExecutorService flushTaskMonitor;
+ private ExecutorService releaseTaskProcessor;
+ private ExecutorService releaseTaskMonitor;
+
+ private FiniteSemaphore flushSemaphore;
+ private FiniteSemaphore releaseSemaphore;
private volatile boolean hasFlushTask;
private int flushCount = 0;
@@ -81,6 +87,8 @@ public class CacheMemoryManager {
}
public void init() {
+ flushSemaphore = new FiniteSemaphore(2, 0);
+ releaseSemaphore = new FiniteSemaphore(2, 0);
engineStatistics =
SchemaEngineStatisticsHolder.getSchemaEngineStatistics()
.getAsCachedSchemaEngineStatistics();
@@ -89,12 +97,56 @@ public class CacheMemoryManager {
} else {
releaseFlushStrategy = new ReleaseFlushStrategySizeBasedImpl(engineStatistics);
}
- flushTaskExecutor =
+ flushTaskMonitor =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_FLUSH_MONITOR.getName());
+ flushTaskProcessor =
IoTDBThreadPoolFactory.newFixedThreadPool(
- CONCURRENT_NUM, ThreadName.SCHEMA_REGION_FLUSH_POOL.getName());
- releaseTaskExecutor =
+ CONCURRENT_NUM, ThreadName.SCHEMA_REGION_FLUSH_PROCESSOR.getName());
+ releaseTaskMonitor =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_RELEASE_MONITOR.getName());
+ releaseTaskProcessor =
IoTDBThreadPoolFactory.newFixedThreadPool(
- CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_POOL.getName());
+ CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_PROCESSOR.getName());
+ releaseTaskMonitor.submit(
+ () -> {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ releaseSemaphore.acquire();
+ try {
+ if (isExceedReleaseThreshold()) {
+ hasReleaseTask = true;
+ tryExecuteMemoryRelease();
+ }
+ } catch (Throwable throwable) {
+ logger.error("Something wrong happened during MTree release.", throwable);
+ throwable.printStackTrace();
+ throw throwable;
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.info("ReleaseTaskMonitor thread is interrupted.");
+ }
+ });
+ flushTaskMonitor.submit(
+ () -> {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ flushSemaphore.acquire();
+ try {
+ if (isExceedFlushThreshold()) {
+ hasFlushTask = true;
+ tryFlushVolatileNodes();
+ }
+ } catch (Throwable throwable) {
+ logger.error("Something wrong happened during MTree flush.", throwable);
+ throwable.printStackTrace();
+ throw throwable;
+ }
+ }
+ } catch (InterruptedException e) {
+ logger.info("FlushTaskMonitor thread is interrupted.");
+ }
+ });
}
public boolean isExceedReleaseThreshold() {
@@ -110,7 +162,7 @@ public class CacheMemoryManager {
* perform an internal and external memory swap to release the memory.
*/
public void ensureMemoryStatus() {
- if (isExceedReleaseThreshold() && !hasReleaseTask) {
+ if (isExceedReleaseThreshold()) {
registerReleaseTask();
}
}
@@ -134,21 +186,8 @@ public class CacheMemoryManager {
}
}
- private synchronized void registerReleaseTask() {
- if (hasReleaseTask) {
- return;
- }
- hasReleaseTask = true;
- releaseTaskExecutor.submit(
- () -> {
- try {
- tryExecuteMemoryRelease();
- } catch (Throwable throwable) {
- logger.error("Something wrong happened during MTree release.", throwable);
- throwable.printStackTrace();
- throw throwable;
- }
- });
+ private void registerReleaseTask() {
+ releaseSemaphore.release();
}
/**
@@ -171,13 +210,13 @@ public class CacheMemoryManager {
store.getLock().threadReadUnlock();
}
},
- releaseTaskExecutor))
+ releaseTaskProcessor))
.toArray(CompletableFuture[]::new))
.join();
releaseCount++;
synchronized (blockObject) {
hasReleaseTask = false;
- if (isExceedFlushThreshold() && !hasFlushTask) {
+ if (isExceedFlushThreshold()) {
registerFlushTask();
} else {
blockObject.notifyAll();
@@ -196,21 +235,8 @@ public class CacheMemoryManager {
}
}
- private synchronized void registerFlushTask() {
- if (hasFlushTask) {
- return;
- }
- hasFlushTask = true;
- flushTaskExecutor.submit(
- () -> {
- try {
- tryFlushVolatileNodes();
- } catch (Throwable throwable) {
- logger.error("Something wrong happened during MTree flush.", throwable);
- throwable.printStackTrace();
- throw throwable;
- }
- });
+ private void registerFlushTask() {
+ flushSemaphore.release();
}
/** Sync all volatile nodes to schemaFile and execute memory release after flush. */
@@ -230,7 +256,7 @@ public class CacheMemoryManager {
store.getLock().unlockWrite();
}
},
- flushTaskExecutor))
+ flushTaskProcessor))
.toArray(CompletableFuture[]::new))
.join();
flushCount++;
@@ -242,30 +268,40 @@ public class CacheMemoryManager {
}
public void clear() {
- if (releaseTaskExecutor != null) {
+ if (releaseTaskMonitor != null) {
+ releaseTaskMonitor.shutdownNow();
+ releaseTaskMonitor = null;
+ }
+ if (flushTaskMonitor != null) {
+ flushTaskMonitor.shutdownNow();
+ releaseTaskMonitor = null;
+ }
+ if (releaseTaskProcessor != null) {
while (true) {
if (!hasReleaseTask) break;
}
- releaseTaskExecutor.shutdown();
+ releaseTaskProcessor.shutdown();
while (true) {
- if (releaseTaskExecutor.isTerminated()) break;
+ if (releaseTaskProcessor.isTerminated()) break;
}
- releaseTaskExecutor = null;
+ releaseTaskProcessor = null;
}
// the release task may submit flush task, thus must be shut down and clear first
- if (flushTaskExecutor != null) {
+ if (flushTaskProcessor != null) {
while (true) {
if (!hasFlushTask) break;
}
- flushTaskExecutor.shutdown();
+ flushTaskProcessor.shutdown();
while (true) {
- if (flushTaskExecutor.isTerminated()) break;
+ if (flushTaskProcessor.isTerminated()) break;
}
- flushTaskExecutor = null;
+ flushTaskProcessor = null;
}
storeList.clear();
releaseFlushStrategy = null;
engineStatistics = null;
+ releaseSemaphore = null;
+ flushSemaphore = null;
}
private CacheMemoryManager() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java b/server/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java
new file mode 100644
index 0000000000..8fd9280904
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.concurrent;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * FiniteSemaphore defines a special Semaphore that the upper limit of permit is capacity. If
+ * permits exceed the capacity, the release request will be ignored.
+ */
+public class FiniteSemaphore {
+ private final int capacity;
+ private final Semaphore semaphore;
+ private int permit;
+
+ public FiniteSemaphore(int capacity, int permit) {
+ if (capacity < permit) {
+ throw new IllegalArgumentException("Capacity should be larger than initial permits.");
+ }
+ this.capacity = capacity;
+ this.semaphore = new Semaphore(permit);
+ this.permit = permit;
+ }
+
+ public void release() {
+ synchronized (this) {
+ if (permit < capacity) {
+ permit++;
+ semaphore.release();
+ }
+ }
+ }
+
+ public void acquire() throws InterruptedException {
+ semaphore.acquire();
+ synchronized (this) {
+ permit--;
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index 867fa497d7..66abe8faab 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.Arrays;
@@ -53,7 +52,6 @@ public class SchemaStatisticsTest extends AbstractSchemaRegionTest {
}
@Test
- @Ignore
public void testMemoryStatistics() throws Exception {
ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 1);
@@ -195,7 +193,6 @@ public class SchemaStatisticsTest extends AbstractSchemaRegionTest {
}
@Test
- @Ignore
public void testSchemaFileNodeStatistics() throws Exception {
if (testParams.getSchemaEngineMode().equals("Schema_File")) {
ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);