You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/02/19 00:57:12 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-2567] Fix thread and ByteBuffer leak after service stopped (#5090)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new efc58c2 [To rel/0.12][IOTDB-2567] Fix thread and ByteBuffer leak after service stopped (#5090)
efc58c2 is described below
commit efc58c207fb433ed3dfc64580b504021f175f18f
Author: BaiJian <er...@hotmail.com>
AuthorDate: Sat Feb 19 08:55:25 2022 +0800
[To rel/0.12][IOTDB-2567] Fix thread and ByteBuffer leak after service stopped (#5090)
Co-authored-by: Haonan <hh...@outlook.com>
---
.../org/apache/iotdb/db/concurrent/ThreadName.java | 2 +
.../org/apache/iotdb/db/engine/StorageEngine.java | 41 ++++++++++--------
.../engine/storagegroup/StorageGroupProcessor.java | 15 +++++--
.../virtualSg/VirtualStorageGroupManager.java | 10 +++++
.../org/apache/iotdb/db/utils/ThreadUtils.java | 50 ++++++++++++++++++++++
.../writelog/manager/MultiFileLogNodeManager.java | 1 +
.../db/writelog/node/ExclusiveWriteLogNode.java | 27 +++++++++++-
.../iotdb/db/writelog/node/WriteLogNode.java | 6 +++
8 files changed, 129 insertions(+), 23 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index ad1e613..a82bdae 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -37,6 +37,8 @@ public enum ThreadName {
COMPACTION_SERVICE("Compaction"),
WAL_DAEMON("WAL-Sync"),
WAL_FORCE_DAEMON("WAL-Force"),
+ WAL_TRIM("WAL-trimTask"),
+ WAL_FLUSH("WAL-Flush"),
INDEX_SERVICE("Index"),
SYNC_CLIENT("Sync-Client"),
SYNC_SERVER("Sync"),
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index a83e2fc..016f1fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -32,7 +32,15 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartiti
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.virtualSg.VirtualStorageGroupManager;
-import org.apache.iotdb.db.exception.*;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.exception.ShutdownException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
@@ -50,6 +58,7 @@ import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -377,11 +386,16 @@ public class StorageEngine implements IService {
@Override
public void stop() {
+ for (VirtualStorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.stopSchedulerPool();
+ }
syncCloseAllProcessor();
- stopTimedService(ttlCheckThread, "TTlCheckThread");
- stopTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
- stopTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
- stopTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
+ ThreadUtils.stopThreadPool(ttlCheckThread, "TTlCheckThread");
+ ThreadUtils.stopThreadPool(
+ seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+ ThreadUtils.stopThreadPool(
+ unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+ ThreadUtils.stopThreadPool(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
recoveryThreadPool.shutdownNow();
if (!recoverAllSgThreadPool.isShutdown()) {
recoverAllSgThreadPool.shutdownNow();
@@ -400,23 +414,12 @@ public class StorageEngine implements IService {
this.reset();
}
- private void stopTimedService(ScheduledExecutorService pool, String poolName) {
- if (pool != null) {
- pool.shutdownNow();
- try {
- pool.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- logger.warn("{} still doesn't exit after 60s", poolName);
- Thread.currentThread().interrupt();
- throw new StorageEngineFailureException(
- String.format("StorageEngine failed to stop because of %s.", poolName), e);
- }
- }
- }
-
@Override
public void shutdown(long milliseconds) throws ShutdownException {
try {
+ for (VirtualStorageGroupManager storageGroupManager : processorMap.values()) {
+ storageGroupManager.stopSchedulerPool();
+ }
forceCloseAllProcessor();
} catch (TsFileProcessorException e) {
throw new ShutdownException(e);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f7b5af8..0df5743 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -268,6 +269,8 @@ public class StorageGroupProcessor {
private volatile boolean compacting = false;
+ private ScheduledExecutorService walTrimScheduleTask;
+
/**
* get the direct byte buffer from pool, each fetch contains two ByteBuffer, return null if fetch
* fails
@@ -399,10 +402,12 @@ public class StorageGroupProcessor {
// recover tsfiles
recover();
// start trim task at last
- ScheduledExecutorService executorService =
+ walTrimScheduleTask =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- String.format("WAL-trimTask-%s/%s", logicalStorageGroupName, virtualStorageGroupId));
- executorService.scheduleWithFixedDelay(
+ String.format(
+ "%s-%s/%s",
+ ThreadName.WAL_TRIM.getName(), logicalStorageGroupName, virtualStorageGroupId));
+ walTrimScheduleTask.scheduleWithFixedDelay(
this::trimTask,
config.getWalPoolTrimIntervalInMS(),
config.getWalPoolTrimIntervalInMS(),
@@ -3067,4 +3072,8 @@ public class StorageGroupProcessor {
public String getInsertWriteLockHolder() {
return insertWriteLockHolder;
}
+
+ public ScheduledExecutorService getWALTrimScheduleTask() {
+ return walTrimScheduleTask;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 2012101..3af47c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.storagegroup.virtualSg;
+import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartitionFilter;
@@ -29,6 +30,7 @@ import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -432,4 +434,12 @@ public class VirtualStorageGroupManager {
public void reset() {
Arrays.fill(virtualStorageGroupProcessor, null);
}
+
+ public void stopSchedulerPool() {
+ for (StorageGroupProcessor vsg : this.virtualStorageGroupProcessor) {
+ if (vsg != null) {
+ ThreadUtils.stopThreadPool(vsg.getWALTrimScheduleTask(), ThreadName.WAL_TRIM.getName());
+ }
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ThreadUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/ThreadUtils.java
new file mode 100644
index 0000000..490be5f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/ThreadUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.utils;
+
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** the utils for managing thread or thread pool */
+public class ThreadUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
+
+ public static void stopThreadPool(ExecutorService pool, String poolName) {
+ if (pool != null) {
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
+ logger.warn("Waiting {} to be terminated is timeout", poolName);
+ }
+ } catch (InterruptedException e) {
+ logger.warn("{} still doesn't exit after 60s", poolName);
+ Thread.currentThread().interrupt();
+ throw new StorageEngineFailureException(
+ String.format("StorageEngine failed to stop because of %s.", poolName), e);
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index ddaf132..55515b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -147,6 +147,7 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
} catch (IOException e) {
logger.error("failed to close {}", node, e);
}
+ node.release();
}
nodeMap.clear();
logger.info("LogNodeManager closed.");
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
index 454fd95..33bea57 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.utils.MmapUtil;
+import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.writelog.io.ILogReader;
import org.apache.iotdb.db.writelog.io.ILogWriter;
import org.apache.iotdb.db.writelog.io.LogWriter;
@@ -38,12 +40,15 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.iotdb.db.concurrent.ThreadName.WAL_FLUSH;
+
/** This WriteLogNode is used to manage insert ahead logs of a TsFile. */
public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<ExclusiveWriteLogNode> {
@@ -92,7 +97,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
// this.identifier contains the storage group name + tsfile name.
FLUSH_BUFFER_THREAD_POOL =
- IoTDBThreadPoolFactory.newSingleThreadExecutor("Flush-WAL-Thread-" + this.identifier);
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(WAL_FLUSH.getName() + "-" + this.identifier);
}
@Override
@@ -166,6 +171,26 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive
}
@Override
+ public void release() {
+ ThreadUtils.stopThreadPool(FLUSH_BUFFER_THREAD_POOL, WAL_FLUSH.getName());
+ lock.lock();
+ try {
+ if (this.logBufferWorking != null && this.logBufferWorking instanceof MappedByteBuffer) {
+ MmapUtil.clean((MappedByteBuffer) this.logBufferFlushing);
+ }
+ if (this.logBufferIdle != null && this.logBufferIdle instanceof MappedByteBuffer) {
+ MmapUtil.clean((MappedByteBuffer) this.logBufferIdle);
+ }
+ if (this.logBufferFlushing != null && this.logBufferFlushing instanceof MappedByteBuffer) {
+ MmapUtil.clean((MappedByteBuffer) this.logBufferFlushing);
+ }
+ logger.debug("ByteBuffers are freed successfully");
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
public void forceSync() {
if (deleted.get()) {
return;
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
index 21be3f7..952238f 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/WriteLogNode.java
@@ -39,6 +39,12 @@ public interface WriteLogNode {
/** Sync and close streams. */
void close() throws IOException;
+ /**
+ * Release the resource it occupies. After this method is invoked, this node should be no longer
+ * to used
+ */
+ void release();
+
/** Write what in cache to disk. */
void forceSync() throws IOException;