You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by zy...@apache.org on 2023/02/17 13:21:04 UTC

[iotdb] branch master updated: [IOTDB-5549] Ensure concurrently MNode memory release (#9089)

This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 20a82d8a10 [IOTDB-5549] Ensure concurrently MNode memory release (#9089)
20a82d8a10 is described below

commit 20a82d8a10646700dbfb03008b1fae9a6ec9909f
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Fri Feb 17 21:20:56 2023 +0800

    [IOTDB-5549] Ensure concurrently MNode memory release (#9089)
---
 .../iotdb/commons/concurrent/ThreadName.java       |   6 +-
 .../mtree/store/disk/cache/CacheMemoryManager.java | 132 +++++++++++++--------
 .../iotdb/db/utils/concurrent/FiniteSemaphore.java |  56 +++++++++
 .../schemaRegion/SchemaStatisticsTest.java         |   3 -
 4 files changed, 144 insertions(+), 53 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 588d473841..9b33ae30ee 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -62,8 +62,10 @@ public enum ThreadName {
   ASYNC_CONFIGNODE_CLIENT_POOL("AsyncConfigNodeIServiceClientPool"),
   ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
   ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeMPPDataExchangeServiceClientPool"),
-  SCHEMA_REGION_RELEASE_POOL("SchemaRegion-Release-Task"),
-  SCHEMA_REGION_FLUSH_POOL("SchemaRegion-Flush-Task");
+  SCHEMA_REGION_RELEASE_PROCESSOR("SchemaRegion-Release-Task-Processor"),
+  SCHEMA_RELEASE_MONITOR("Schema-Release-Task-Monitor"),
+  SCHEMA_REGION_FLUSH_PROCESSOR("SchemaRegion-Flush-Task-Processor"),
+  SCHEMA_FLUSH_MONITOR("Schema-Flush-Task-Monitor");
 
   private final String name;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
index 8ac11408b4..a4d13e6153 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/store/disk/cache/CacheMemoryManager.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.ReleaseFlushStra
 import org.apache.iotdb.db.metadata.mtree.store.disk.memcontrol.ReleaseFlushStrategySizeBasedImpl;
 import org.apache.iotdb.db.metadata.rescon.CachedSchemaEngineStatistics;
 import org.apache.iotdb.db.metadata.rescon.SchemaEngineStatisticsHolder;
+import org.apache.iotdb.db.utils.concurrent.FiniteSemaphore;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,8 +53,13 @@ public class CacheMemoryManager {
 
   private static final int CONCURRENT_NUM = 10;
 
-  private ExecutorService flushTaskExecutor;
-  private ExecutorService releaseTaskExecutor;
+  private ExecutorService flushTaskProcessor;
+  private ExecutorService flushTaskMonitor;
+  private ExecutorService releaseTaskProcessor;
+  private ExecutorService releaseTaskMonitor;
+
+  private FiniteSemaphore flushSemaphore;
+  private FiniteSemaphore releaseSemaphore;
 
   private volatile boolean hasFlushTask;
   private int flushCount = 0;
@@ -81,6 +87,8 @@ public class CacheMemoryManager {
   }
 
   public void init() {
+    flushSemaphore = new FiniteSemaphore(2, 0);
+    releaseSemaphore = new FiniteSemaphore(2, 0);
     engineStatistics =
         SchemaEngineStatisticsHolder.getSchemaEngineStatistics()
             .getAsCachedSchemaEngineStatistics();
@@ -89,12 +97,56 @@ public class CacheMemoryManager {
     } else {
       releaseFlushStrategy = new ReleaseFlushStrategySizeBasedImpl(engineStatistics);
     }
-    flushTaskExecutor =
+    flushTaskMonitor =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_FLUSH_MONITOR.getName());
+    flushTaskProcessor =
         IoTDBThreadPoolFactory.newFixedThreadPool(
-            CONCURRENT_NUM, ThreadName.SCHEMA_REGION_FLUSH_POOL.getName());
-    releaseTaskExecutor =
+            CONCURRENT_NUM, ThreadName.SCHEMA_REGION_FLUSH_PROCESSOR.getName());
+    releaseTaskMonitor =
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SCHEMA_RELEASE_MONITOR.getName());
+    releaseTaskProcessor =
         IoTDBThreadPoolFactory.newFixedThreadPool(
-            CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_POOL.getName());
+            CONCURRENT_NUM, ThreadName.SCHEMA_REGION_RELEASE_PROCESSOR.getName());
+    releaseTaskMonitor.submit(
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              releaseSemaphore.acquire();
+              try {
+                if (isExceedReleaseThreshold()) {
+                  hasReleaseTask = true;
+                  tryExecuteMemoryRelease();
+                }
+              } catch (Throwable throwable) {
+                logger.error("Something wrong happened during MTree release.", throwable);
+                throwable.printStackTrace();
+                throw throwable;
+              }
+            }
+          } catch (InterruptedException e) {
+            logger.info("ReleaseTaskMonitor thread is interrupted.");
+          }
+        });
+    flushTaskMonitor.submit(
+        () -> {
+          try {
+            while (!Thread.currentThread().isInterrupted()) {
+              flushSemaphore.acquire();
+              try {
+                if (isExceedFlushThreshold()) {
+                  hasFlushTask = true;
+                  tryFlushVolatileNodes();
+                }
+              } catch (Throwable throwable) {
+                logger.error("Something wrong happened during MTree flush.", throwable);
+                throwable.printStackTrace();
+                throw throwable;
+              }
+            }
+          } catch (InterruptedException e) {
+            logger.info("FlushTaskMonitor thread is interrupted.");
+          }
+        });
   }
 
   public boolean isExceedReleaseThreshold() {
@@ -110,7 +162,7 @@ public class CacheMemoryManager {
    * perform an internal and external memory swap to release the memory.
    */
   public void ensureMemoryStatus() {
-    if (isExceedReleaseThreshold() && !hasReleaseTask) {
+    if (isExceedReleaseThreshold()) {
       registerReleaseTask();
     }
   }
@@ -134,21 +186,8 @@ public class CacheMemoryManager {
     }
   }
 
-  private synchronized void registerReleaseTask() {
-    if (hasReleaseTask) {
-      return;
-    }
-    hasReleaseTask = true;
-    releaseTaskExecutor.submit(
-        () -> {
-          try {
-            tryExecuteMemoryRelease();
-          } catch (Throwable throwable) {
-            logger.error("Something wrong happened during MTree release.", throwable);
-            throwable.printStackTrace();
-            throw throwable;
-          }
-        });
+  private void registerReleaseTask() {
+    releaseSemaphore.release();
   }
 
   /**
@@ -171,13 +210,13 @@ public class CacheMemoryManager {
                                   store.getLock().threadReadUnlock();
                                 }
                               },
-                              releaseTaskExecutor))
+                              releaseTaskProcessor))
                   .toArray(CompletableFuture[]::new))
           .join();
       releaseCount++;
       synchronized (blockObject) {
         hasReleaseTask = false;
-        if (isExceedFlushThreshold() && !hasFlushTask) {
+        if (isExceedFlushThreshold()) {
           registerFlushTask();
         } else {
           blockObject.notifyAll();
@@ -196,21 +235,8 @@ public class CacheMemoryManager {
     }
   }
 
-  private synchronized void registerFlushTask() {
-    if (hasFlushTask) {
-      return;
-    }
-    hasFlushTask = true;
-    flushTaskExecutor.submit(
-        () -> {
-          try {
-            tryFlushVolatileNodes();
-          } catch (Throwable throwable) {
-            logger.error("Something wrong happened during MTree flush.", throwable);
-            throwable.printStackTrace();
-            throw throwable;
-          }
-        });
+  private void registerFlushTask() {
+    flushSemaphore.release();
   }
 
   /** Sync all volatile nodes to schemaFile and execute memory release after flush. */
@@ -230,7 +256,7 @@ public class CacheMemoryManager {
                                   store.getLock().unlockWrite();
                                 }
                               },
-                              flushTaskExecutor))
+                              flushTaskProcessor))
                   .toArray(CompletableFuture[]::new))
           .join();
       flushCount++;
@@ -242,30 +268,40 @@ public class CacheMemoryManager {
   }
 
   public void clear() {
-    if (releaseTaskExecutor != null) {
+    if (releaseTaskMonitor != null) {
+      releaseTaskMonitor.shutdownNow();
+      releaseTaskMonitor = null;
+    }
+    if (flushTaskMonitor != null) {
+      flushTaskMonitor.shutdownNow();
+      releaseTaskMonitor = null;
+    }
+    if (releaseTaskProcessor != null) {
       while (true) {
         if (!hasReleaseTask) break;
       }
-      releaseTaskExecutor.shutdown();
+      releaseTaskProcessor.shutdown();
       while (true) {
-        if (releaseTaskExecutor.isTerminated()) break;
+        if (releaseTaskProcessor.isTerminated()) break;
       }
-      releaseTaskExecutor = null;
+      releaseTaskProcessor = null;
     }
     // the release task may submit flush task, thus must be shut down and clear first
-    if (flushTaskExecutor != null) {
+    if (flushTaskProcessor != null) {
       while (true) {
         if (!hasFlushTask) break;
       }
-      flushTaskExecutor.shutdown();
+      flushTaskProcessor.shutdown();
       while (true) {
-        if (flushTaskExecutor.isTerminated()) break;
+        if (flushTaskProcessor.isTerminated()) break;
       }
-      flushTaskExecutor = null;
+      flushTaskProcessor = null;
     }
     storeList.clear();
     releaseFlushStrategy = null;
     engineStatistics = null;
+    releaseSemaphore = null;
+    flushSemaphore = null;
   }
 
   private CacheMemoryManager() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java b/server/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java
new file mode 100644
index 0000000000..8fd9280904
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/concurrent/FiniteSemaphore.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils.concurrent;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * FiniteSemaphore defines a special Semaphore that the upper limit of permit is capacity. If
+ * permits exceed the capacity, the release request will be ignored.
+ */
+public class FiniteSemaphore {
+  private final int capacity;
+  private final Semaphore semaphore;
+  private int permit;
+
+  public FiniteSemaphore(int capacity, int permit) {
+    if (capacity < permit) {
+      throw new IllegalArgumentException("Capacity should be larger than initial permits.");
+    }
+    this.capacity = capacity;
+    this.semaphore = new Semaphore(permit);
+    this.permit = permit;
+  }
+
+  public void release() {
+    synchronized (this) {
+      if (permit < capacity) {
+        permit++;
+        semaphore.release();
+      }
+    }
+  }
+
+  public void acquire() throws InterruptedException {
+    semaphore.acquire();
+    synchronized (this) {
+      permit--;
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
index 867fa497d7..66abe8faab 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/schemaRegion/SchemaStatisticsTest.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -53,7 +52,6 @@ public class SchemaStatisticsTest extends AbstractSchemaRegionTest {
   }
 
   @Test
-  @Ignore
   public void testMemoryStatistics() throws Exception {
     ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);
     ISchemaRegion schemaRegion2 = getSchemaRegion("root.sg2", 1);
@@ -195,7 +193,6 @@ public class SchemaStatisticsTest extends AbstractSchemaRegionTest {
   }
 
   @Test
-  @Ignore
   public void testSchemaFileNodeStatistics() throws Exception {
     if (testParams.getSchemaEngineMode().equals("Schema_File")) {
       ISchemaRegion schemaRegion1 = getSchemaRegion("root.sg1", 0);