You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2022/01/05 07:46:11 UTC

[iotdb] branch client_manager_add_close updated: Fix QueryFrequencyRecorder thread leaking Fix WalTrim thread leaking Add thread names

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch client_manager_add_close
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/client_manager_add_close by this push:
     new 0dfe0b1  Fix QueryFrequencyRecorder thread leaking Fix WalTrim thread leaking Add thread names
0dfe0b1 is described below

commit 0dfe0b14bcfb4802b88b1401f00d47d821cd2751
Author: jt <jt...@163.com>
AuthorDate: Wed Jan 5 15:44:47 2022 +0800

    Fix QueryFrequencyRecorder thread leaking
    Fix WalTrim thread leaking
    Add thread names
---
 .../iotdb/cluster/server/StoppedMemberManager.java    |  1 -
 .../server/clusterinfo/ClusterInfoServerTest.java     |  4 ----
 .../clusterinfo/ClusterInfoServiceImplTest.java       |  3 ---
 .../org/apache/iotdb/db/engine/StorageEngine.java     | 14 ++++++++++----
 .../storagegroup/VirtualStorageGroupProcessor.java    | 15 ++++++++++++---
 .../storagegroup/virtualSg/StorageGroupManager.java   |  9 +++++++++
 .../iotdb/db/service/basic/BasicServiceProvider.java  |  3 ++-
 .../db/service/basic/QueryFrequencyRecorder.java      | 19 ++++++++++++++++++-
 8 files changed, 51 insertions(+), 17 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
index d5ed278..14f6efe 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/StoppedMemberManager.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
-import org.apache.iotdb.db.utils.TestOnly;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
index b3689fd..4ea3e65 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServerTest.java
@@ -21,8 +21,6 @@ package org.apache.iotdb.cluster.server.clusterinfo;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -34,8 +32,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-
 public class ClusterInfoServerTest {
 
   ClusterInfoServiceImplTest test;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
index cce2f05..00c1377 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServiceImplTest.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMemberTest;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 
@@ -35,7 +33,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 35900c7..b7cad9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.engine;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.ServerConfigConsistent;
@@ -94,6 +95,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 public class StorageEngine implements IService {
+
   private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -273,7 +275,7 @@ public class StorageEngine implements IService {
 
     recover();
 
-    ttlCheckThread = Executors.newSingleThreadScheduledExecutor();
+    ttlCheckThread = Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("CheckTTL"));
     ttlCheckThread.scheduleAtFixedRate(
         this::checkTTL, TTL_CHECK_INTERVAL, TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
     logger.info("start ttl check thread successfully.");
@@ -296,7 +298,8 @@ public class StorageEngine implements IService {
   private void startTimedService() {
     // timed flush sequence memtable
     if (config.isEnableTimedFlushSeqMemtable()) {
-      seqMemtableTimedFlushCheckThread = Executors.newSingleThreadScheduledExecutor();
+      seqMemtableTimedFlushCheckThread =
+          Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("FlushSeqMemTable"));
       seqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
           this::timedFlushSeqMemTable,
           config.getSeqMemtableFlushCheckInterval(),
@@ -306,7 +309,8 @@ public class StorageEngine implements IService {
     }
     // timed flush unsequence memtable
     if (config.isEnableTimedFlushUnseqMemtable()) {
-      unseqMemtableTimedFlushCheckThread = Executors.newSingleThreadScheduledExecutor();
+      unseqMemtableTimedFlushCheckThread =
+          Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("FlushUnseqMemTable"));
       unseqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
           this::timedFlushUnseqMemTable,
           config.getUnseqMemtableFlushCheckInterval(),
@@ -316,7 +320,8 @@ public class StorageEngine implements IService {
     }
     // timed close tsfile
     if (config.isEnableTimedCloseTsFile()) {
-      tsFileTimedCloseCheckThread = Executors.newSingleThreadScheduledExecutor();
+      tsFileTimedCloseCheckThread =
+          Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("CloseTsFile"));
       tsFileTimedCloseCheckThread.scheduleAtFixedRate(
           this::timedCloseTsFileProcessor,
           config.getCloseTsFileCheckInterval(),
@@ -360,6 +365,7 @@ public class StorageEngine implements IService {
   public void stop() {
     for (StorageGroupManager storageGroupManager : processorMap.values()) {
       storageGroupManager.stopCompactionSchedulerPool();
+      storageGroupManager.stopWalTrimPool();
     }
     syncCloseAllProcessor();
     stopTimedService(ttlCheckThread, "TTlCheckThread");
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
index df72353..6c174f3 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/VirtualStorageGroupProcessor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -271,12 +272,14 @@ public class VirtualStorageGroupProcessor {
   private String insertWriteLockHolder = "";
 
   private ScheduledExecutorService timedCompactionScheduleTask =
-      Executors.newSingleThreadScheduledExecutor();
+      Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("CompactionSchedule"));
 
   public static final long COMPACTION_TASK_SUBMIT_DELAY = 20L * 1000L;
 
   private IDTable idTable;
 
+  private ScheduledExecutorService trimWalService;
+
   /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
@@ -386,8 +389,9 @@ public class VirtualStorageGroupProcessor {
       logger.error("create Storage Group system Directory {} failed", storageGroupSysDir.getPath());
     }
 
-    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-    executorService.scheduleWithFixedDelay(
+    trimWalService =
+        Executors.newSingleThreadScheduledExecutor(new IoTThreadFactory("TrimWalBuffer"));
+    trimWalService.scheduleWithFixedDelay(
         this::trimTask,
         config.getWalPoolTrimIntervalInMS(),
         config.getWalPoolTrimIntervalInMS(),
@@ -3212,6 +3216,7 @@ public class VirtualStorageGroupProcessor {
 
   @FunctionalInterface
   public interface CompactionRecoverCallBack {
+
     void call();
   }
 
@@ -3239,4 +3244,8 @@ public class VirtualStorageGroupProcessor {
   public IDTable getIdTable() {
     return idTable;
   }
+
+  public void stopWalTrim() {
+    trimWalService.shutdownNow();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
index 92b78bb..ece0410 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/StorageGroupManager.java
@@ -489,6 +489,15 @@ public class StorageGroupManager {
     }
   }
 
+  public void stopWalTrimPool() {
+    for (VirtualStorageGroupProcessor virtualStorageGroupProcessor :
+        this.virtualStorageGroupProcessor) {
+      if (virtualStorageGroupProcessor != null) {
+        virtualStorageGroupProcessor.stopWalTrim();
+      }
+    }
+  }
+
   public void setSettling(boolean settling) {
     isSettling.set(settling);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
index 6f8e823..7d2006c 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/BasicServiceProvider.java
@@ -83,7 +83,8 @@ public class BasicServiceProvider {
   protected IPlanExecutor executor;
 
   public BasicServiceProvider() throws QueryProcessException {
-    queryFrequencyRecorder = new QueryFrequencyRecorder(CONFIG);
+    queryFrequencyRecorder = QueryFrequencyRecorder.getInstance();
+    queryFrequencyRecorder.reset();
     processor = new Planner();
     executor = new PlanExecutor();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
index 0c5cf6f..3c43d63 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/basic/QueryFrequencyRecorder.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.service.basic;
 
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +34,19 @@ public class QueryFrequencyRecorder {
   private static final Logger QUERY_FREQUENCY_LOGGER = LoggerFactory.getLogger("QUERY_FREQUENCY");
   private static final AtomicInteger QUERY_COUNT = new AtomicInteger(0);
 
-  public QueryFrequencyRecorder(IoTDBConfig config) {
+  public static QueryFrequencyRecorder getInstance() {
+    return QueryFrequencyRecorder.InstanceHolder.instance;
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {}
+
+    private static QueryFrequencyRecorder instance =
+        new QueryFrequencyRecorder(IoTDBDescriptor.getInstance().getConfig());
+  }
+
+  private QueryFrequencyRecorder(IoTDBConfig config) {
     ScheduledExecutorService timedQuerySqlCountThread =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("timedQuerySqlCount");
     timedQuerySqlCountThread.scheduleAtFixedRate(
@@ -51,4 +64,8 @@ public class QueryFrequencyRecorder {
   public void incrementAndGet() {
     QUERY_COUNT.incrementAndGet();
   }
+
+  public void reset() {
+    QUERY_COUNT.set(0);
+  }
 }