You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/01/05 07:46:11 UTC
[iotdb] branch client_manager_add_close updated: Fix QueryFrequencyRecorder thread leaking Fix WalTrim thread leaking Add thread names
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch client_manager_add_close
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/client_manager_add_close by this push:
new 0dfe0b1 Fix QueryFrequencyRecorder thread leaking Fix WalTrim thread leaking Add thread names
0dfe0b1 is described below
commit 0dfe0b14bcfb4802b88b1401f00d47d821cd2751
Author: jt <jt...@163.com>
AuthorDate: Wed Jan 5 15:44:47 2022 +0800
Fix QueryFrequencyRecorder thread leaking
Fix WalTrim thread leaking
Add thread names
---
.../iotdb/cluster/server/StoppedMemberManager.java | 1 -
.../server/clusterinfo/ClusterInfoServerTest.java | 4 ----
.../clusterinfo/ClusterInfoServiceImplTest.java | 3 ---
.../org/apache/iotdb/db/engine/StorageEngine.java | 14 ++++++++++----
.../storagegroup/VirtualStorageGroupProcessor.java | 15 ++++++++++++---
.../storagegroup/virtualSg/StorageGroupManager.java | 9 +++++++++
.../iotdb/db/service/basic/BasicServiceProvider.java | 3 ++-
.../db/service/basic/QueryFrequencyRecorder.java | 19 ++++++++++++++++++-
8 files changed, 51 insertions(+), 17 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index d5ed278..14f6efe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.utils.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
index b3689fd..4ea3e65 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.cluster.server.clusterinfo;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -34,8 +32,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-
public class ClusterInfoServerTest {
ClusterInfoServiceImplTest test;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
index cce2f05..00c1377 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMemberTest;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -35,7 +33,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 35900c7..b7cad9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.ServerConfigConsistent;
@@ -94,6 +95,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class StorageEngine implements IService {
+
private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -273,7 +275,7 @@ public class StorageEngine implements IService {
recover();
- ttlCheckThread = Executors.newSingleThreadScheduledExecutor();
+ ttlCheckThread = Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("CheckTTL"));
ttlCheckThread.scheduleAtFixedRate(
this::checkTTL, TTL_CHECK_INTERVAL, TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
logger.info("start ttl check thread successfully.");
@@ -296,7 +298,8 @@ public class StorageEngine implements IService {
private void startTimedService() {
// timed flush sequence memtable
if (config.isEnableTimedFlushSeqMemtable()) {
- seqMemtableTimedFlushCheckThread = Executors.newSingleThreadScheduledExecutor();
+ seqMemtableTimedFlushCheckThread =
+ Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("FlushSeqMemTable"));
seqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
this::timedFlushSeqMemTable,
config.getSeqMemtableFlushCheckInterval(),
@@ -306,7 +309,8 @@ public class StorageEngine implements IService {
}
// timed flush unsequence memtable
if (config.isEnableTimedFlushUnseqMemtable()) {
- unseqMemtableTimedFlushCheckThread = Executors.newSingleThreadScheduledExecutor();
+ unseqMemtableTimedFlushCheckThread =
+ Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("FlushUnseqMemTable"));
unseqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
this::timedFlushUnseqMemTable,
config.getUnseqMemtableFlushCheckInterval(),
@@ -316,7 +320,8 @@ public class StorageEngine implements IService {
}
// timed close tsfile
if (config.isEnableTimedCloseTsFile()) {
- tsFileTimedCloseCheckThread = Executors.newSingleThreadScheduledExecutor();
+ tsFileTimedCloseCheckThread =
+ Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("CloseTsFile"));
tsFileTimedCloseCheckThread.scheduleAtFixedRate(
this::timedCloseTsFileProcessor,
config.getCloseTsFileCheckInterval(),
@@ -360,6 +365,7 @@ public class StorageEngine implements IService {
public void stop() {
for (StorageGroupManager storageGroupManager : processorMap.values()) {
storageGroupManager.stopCompactionSchedulerPool();
+ storageGroupManager.stopWalTrimPool();
}
syncCloseAllProcessor();
stopTimedService(ttlCheckThread, "TTlCheckThread");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index df72353..6c174f3 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -271,12 +272,14 @@ public class VirtualStorageGroupProcessor {
private String insertWriteLockHolder = "";
private ScheduledExecutorService timedCompactionScheduleTask =
- Executors.newSingleThreadScheduledExecutor();
+ Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("CompactionSchedule"));
public static final long COMPACTION_TASK_SUBMIT_DELAY = 20L * 1000L;
private IDTable idTable;
+ private ScheduledExecutorService trimWalService;
+
/** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
public ByteBuffer[] getWalDirectByteBuffer() {
ByteBuffer[] res = new ByteBuffer[2];
@@ -386,8 +389,9 @@ public class VirtualStorageGroupProcessor {
logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath());
}
- ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleWithFixedDelay(
+ trimWalService =
+ Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("TrimWalBuffer"));
+ trimWalService.scheduleWithFixedDelay(
this::trimTask,
config.getWalPoolTrimIntervalInMS(),
config.getWalPoolTrimIntervalInMS(),
@@ -3212,6 +3216,7 @@ public class VirtualStorageGroupProcessor {
@FunctionalInterface
public interface CompactionRecoverCallBack {
+
void call();
}
@@ -3239,4 +3244,8 @@ public class VirtualStorageGroupProcessor {
public IDTable getIdTable() {
return idTable;
}
+
+ public void stopWalTrim() {
+ trimWalService.shutdownNow();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index 92b78bb..ece0410 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -489,6 +489,15 @@ public class StorageGroupManager {
}
}
+ public void stopWalTrimPool() {
+ for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+ this.virtualStorageGroupProcessor) {
+ if (virtualStorageGroupProcessor != null) {
+ virtualStorageGroupProcessor.stopWalTrim();
+ }
+ }
+ }
+
public void setSettling(boolean settling) {
isSettling.set(settling);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 6f8e823..7d2006c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -83,7 +83,8 @@ public class BasicServiceProvider {
protected IPlanExecutor executor;
public BasicServiceProvider() throws QueryProcessException {
- queryFrequencyRecorder = new QueryFrequencyRecorder(CONFIG);
+ queryFrequencyRecorder = QueryFrequencyRecorder.getInstance();
+ queryFrequencyRecorder.reset();
processor = new Planner();
executor = new PlanExecutor();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
index 0c5cf6f..3c43d63 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.service.basic;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +34,19 @@ public class QueryFrequencyRecorder {
private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY");
private static final AtomicInteger QUERY_COUNT = new AtomicInteger(0);
- public QueryFrequencyRecorder(IoTDBConfig config) {
+ public static QueryFrequencyRecorder getInstance() {
+ return QueryFrequencyRecorder.InstanceHolder.instance;
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {}
+
+ private static QueryFrequencyRecorder instance =
+ new QueryFrequencyRecorder(IoTDBDescriptor.getInstance().getConfig());
+ }
+
+ private QueryFrequencyRecorder(IoTDBConfig config) {
ScheduledExecutorService timedQuerySqlCountThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
timedQuerySqlCountThread.scheduleAtFixedRate(
@@ -51,4 +64,8 @@ public class QueryFrequencyRecorder {
public void incrementAndGet() {
QUERY_COUNT.incrementAndGet();
}
+
+ public void reset() {
+ QUERY_COUNT.set(0);
+ }
}