You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/02/19 00:57:12 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-2567] Fix thread and ByteBuffer leak after service stopped (#5090)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new efc58c2  [To rel/0.12][IOTDB-2567]  Fix thread and ByteBuffer leak after service stopped (#5090)
efc58c2 is described below

commit efc58c207fb433ed3dfc64580b504021f175f18f
Author: BaiJian <er...@hotmail.com>
AuthorDate: Sat Feb 19 08:55:25 2022 +0800

    [To rel/0.12][IOTDB-2567]  Fix thread and ByteBuffer leak after service stopped (#5090)
    
    Co-authored-by: Haonan <hh...@outlook.com>
---
 .../org/apache/iotdb/db/concurrent/ThreadName.java |  2 +
 .../org/apache/iotdb/db/engine/StorageEngine.java  | 41 ++++++++++--------
 .../engine/storagegroup/StorageGroupProcessor.java | 15 +++++--
 .../virtualSg/VirtualStorageGroupManager.java      | 10 +++++
 .../org/apache/iotdb/db/utils/ThreadUtils.java     | 50 ++++++++++++++++++++++
 .../writelog/manager/MultiFileLogNodeManager.java  |  1 +
 .../db/writelog/node/ExclusiveWriteLogNode.java    | 27 +++++++++++-
 .../iotdb/db/writelog/node/WriteLogNode.java       |  6 +++
 8 files changed, 129 insertions(+), 23 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index ad1e613..a82bdae 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -37,6 +37,8 @@ public enum ThreadName {
   COMPACTION_SERVICE("Compaction"),
   WAL_DAEMON("WAL-Sync"),
   WAL_FORCE_DAEMON("WAL-Force"),
+  WAL_TRIM("WAL-trimTask"),
+  WAL_FLUSH("WAL-Flush"),
   INDEX_SERVICE("Index"),
   SYNC_CLIENT("Sync-Client"),
   SYNC_SERVER("Sync"),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index a83e2fc..016f1fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -32,7 +32,15 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartiti
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.ShutdownException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -50,6 +58,7 @@ import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -377,11 +386,16 @@ public class StorageEngine implements IService {
 
   @Override
   public void stop() {
+    for (VirtualStorageGroupManager storageGroupManager : processorMap.values()) {
+      storageGroupManager.stopSchedulerPool();
+    }
     syncCloseAllProcessor();
-    stopTimedService(ttlCheckThread, "TTlCheckThread");
-    stopTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
-    stopTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
-    stopTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
+    ThreadUtils.stopThreadPool(ttlCheckThread, "TTlCheckThread");
+    ThreadUtils.stopThreadPool(
+        seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+    ThreadUtils.stopThreadPool(
+        unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+    ThreadUtils.stopThreadPool(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
     recoveryThreadPool.shutdownNow();
     if (!recoverAllSgThreadPool.isShutdown()) {
       recoverAllSgThreadPool.shutdownNow();
@@ -400,23 +414,12 @@ public class StorageEngine implements IService {
     this.reset();
   }
 
-  private void stopTimedService(ScheduledExecutorService pool, String poolName) {
-    if (pool != null) {
-      pool.shutdownNow();
-      try {
-        pool.awaitTermination(60, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        logger.warn("{} still doesn't exit after 60s", poolName);
-        Thread.currentThread().interrupt();
-        throw new StorageEngineFailureException(
-            String.format("StorageEngine failed to stop because of %s.", poolName), e);
-      }
-    }
-  }
-
   @Override
   public void shutdown(long milliseconds) throws ShutdownException {
     try {
+      for (VirtualStorageGroupManager storageGroupManager : processorMap.values()) {
+        storageGroupManager.stopSchedulerPool();
+      }
       forceCloseAllProcessor();
     } catch (TsFileProcessorException e) {
       throw new ShutdownException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f7b5af8..0df5743 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -268,6 +269,8 @@ public class StorageGroupProcessor {
 
   private volatile boolean compacting = false;
 
+  private ScheduledExecutorService walTrimScheduleTask;
+
   /**
    * get the direct byte buffer from pool, each fetch contains two ByteBuffer, return null if fetch
    * fails
@@ -399,10 +402,12 @@ public class StorageGroupProcessor {
     // recover tsfiles
     recover();
     // start trim task at last
-    ScheduledExecutorService executorService =
+    walTrimScheduleTask =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-            String.format("WAL-trimTask-%s/%s", logicalStorageGroupName, virtualStorageGroupId));
-    executorService.scheduleWithFixedDelay(
+            String.format(
+                "%s-%s/%s",
+                ThreadName.WAL_TRIM.getName(), logicalStorageGroupName, virtualStorageGroupId));
+    walTrimScheduleTask.scheduleWithFixedDelay(
         this::trimTask,
         config.getWalPoolTrimIntervalInMS(),
         config.getWalPoolTrimIntervalInMS(),
@@ -3067,4 +3072,8 @@ public class StorageGroupProcessor {
   public String getInsertWriteLockHolder() {
     return insertWriteLockHolder;
   }
+
+  public ScheduledExecutorService getWALTrimScheduleTask() {
+    return walTrimScheduleTask;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 2012101..3af47c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.storagegroup.virtualSg;
 
+import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -432,4 +434,12 @@ public class VirtualStorageGroupManager {
   public void reset() {
     Arrays.fill(virtualStorageGroupProcessor, null);
   }
+
+  public void stopSchedulerPool() {
+    for (StorageGroupProcessor vsg : this.virtualStorageGroupProcessor) {
+      if (vsg != null) {
+        ThreadUtils.stopThreadPool(vsg.getWALTrimScheduleTask(), ThreadName.WAL_TRIM.getName());
+      }
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ThreadUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/ThreadUtils.java
new file mode 100644
index 0000000..490be5f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ThreadUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** the utils for managing thread or thread pool */
+public class ThreadUtils {
+
+  private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
+
+  public static void stopThreadPool(ExecutorService pool, String poolName) {
+    if (pool != null) {
+      pool.shutdownNow();
+      try {
+        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
+          logger.warn("Waiting {} to be terminated is timeout", poolName);
+        }
+      } catch (InterruptedException e) {
+        logger.warn("{} still doesn't exit after 60s", poolName);
+        Thread.currentThread().interrupt();
+        throw new StorageEngineFailureException(
+            String.format("StorageEngine failed to stop because of %s.", poolName), e);
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index ddaf132..55515b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -147,6 +147,7 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
       } catch (IOException e) {
         logger.error("failed to close {}", node, e);
       }
+      node.release();
     }
     nodeMap.clear();
     logger.info("LogNodeManager closed.");
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 454fd95..33bea57 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.utils.ThreadUtils;
 import org.apache.iotdb.db.writelog.io.ILogReader;
 import org.apache.iotdb.db.writelog.io.ILogWriter;
 import org.apache.iotdb.db.writelog.io.LogWriter;
@@ -38,12 +40,15 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.iotdb.db.concurrent.ThreadName.WAL_FLUSH;
+
 /** This WriteLogNode is used to manage insert ahead logs of a TsFile. */
 public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<ExclusiveWriteLogNode> {
 
@@ -92,7 +97,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
     }
     // this.identifier contains the storage group name + tsfile name.
     FLUSH_BUFFER_THREAD_POOL =
-        IoTDBThreadPoolFactory.newSingleThreadExecutor("Flush-WAL-Thread-" + this.identifier);
+        IoTDBThreadPoolFactory.newSingleThreadExecutor(WAL_FLUSH.getName() + "-" + this.identifier);
   }
 
   @Override
@@ -166,6 +171,26 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
   }
 
   @Override
+  public void release() {
+    ThreadUtils.stopThreadPool(FLUSH_BUFFER_THREAD_POOL, WAL_FLUSH.getName());
+    lock.lock();
+    try {
+      if (this.logBufferWorking != null && this.logBufferWorking instanceof MappedByteBuffer) {
+        MmapUtil.clean((MappedByteBuffer) this.logBufferFlushing);
+      }
+      if (this.logBufferIdle != null && this.logBufferIdle instanceof MappedByteBuffer) {
+        MmapUtil.clean((MappedByteBuffer) this.logBufferIdle);
+      }
+      if (this.logBufferFlushing != null && this.logBufferFlushing instanceof MappedByteBuffer) {
+        MmapUtil.clean((MappedByteBuffer) this.logBufferFlushing);
+      }
+      logger.debug("ByteBuffers are freed successfully");
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
   public void forceSync() {
     if (deleted.get()) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index 21be3f7..952238f 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -39,6 +39,12 @@ public interface WriteLogNode {
   /** Sync and close streams. */
   void close() throws IOException;
 
+  /**
+   * Release the resource it occupies. After this method is invoked, this node should be no longer
+   * to used
+   */
+  void release();
+
   /** Write what in cache to disk. */
   void forceSync() throws IOException;