You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/12 07:03:59 UTC

[iotdb] branch cluster- updated (c2f0cb6 -> 0c13adf)

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a change to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


    from c2f0cb6  continue for threadpool
     add 1462869  [IOTDB-1537]fix insertTablet permission (#3696)
     add 394467b  [IOTDB-1541] Change sequence of wal and memtable in insert (#3660)
     add 05cce67  [IOTDB-1552]Some basic operators should not be applied to text time series (#3714)
     add c5bf93a  Fix grafana user guide (#3715)
     add 9704a63  Fix boostrap 1.72.0 download failed in CI because of the redirection by jfrog.io (#3720)
     add cb28489   [IOTDB-1540] Bug Fix: 500 when using IN operator (#3718)
     add b6538cd  [IOTDB-1485] Replace tsfile_size_threshold by unseq_tsfile_size/seq_tsfile_size (#3702)
     new 936c9eb  merge with master
     new 9f71122  try to fix tests
     new 0c13adf  contine to fix tests

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/client.yml                       |  11 +-
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     |  32 ++-
 .../iotdb/cluster/coordinator/Coordinator.java     |  12 +-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |  11 +-
 .../log/manage/PartitionedSnapshotLogManager.java  |   5 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |  23 +-
 .../apache/iotdb/cluster/metadata/CMManager.java   |   3 +
 .../iotdb/cluster/query/ClusterPlanExecutor.java   |  15 +-
 .../cluster/query/fill/ClusterPreviousFill.java    |   4 +-
 .../query/last/ClusterLastQueryExecutor.java       |   5 +-
 .../iotdb/cluster/server/ClusterRPCService.java    |  26 +--
 .../cluster/server/PullSnapshotHintService.java    |   4 +-
 .../cluster/server/member/DataGroupMember.java     |  15 +-
 .../cluster/server/member/MetaGroupMember.java     |   1 +
 .../iotdb/cluster/server/member/RaftMember.java    |  21 +-
 .../cluster/server/raft/AbstractRaftService.java   |   5 +-
 .../server/service/DataGroupServiceImpls.java      |  38 ++-
 .../cluster/server/service/MetaAsyncService.java   |  30 ++-
 .../cluster/integration/BaseSingleNodeTest.java    |  14 +-
 .../iotdb/cluster/integration/SingleNodeTest.java  |   8 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   2 +-
 docs/UserGuide/Ecosystem Integration/Grafana.md    |  17 ++
 docs/zh/UserGuide/Ecosystem Integration/Grafana.md |  32 ++-
 grafana/readme.md                                  |  17 ++
 grafana/readme_zh.md                               |  23 ++
 .../resources/conf/iotdb-engine.properties         |  12 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   1 +
 .../db/concurrent/IoTDBThreadPoolFactory.java      |  28 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  21 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  36 ++-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  14 +-
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   6 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  52 ++++-
 .../db/qp/logical/crud/BasicFunctionOperator.java  |  22 +-
 .../iotdb/db/qp/logical/crud/InOperator.java       |   4 +
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |   7 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |  20 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   5 +-
 .../org/apache/iotdb/db/service/ServiceType.java   |   1 +
 .../org/apache/iotdb/db/integration/IoTDBInIT.java | 255 +++++++++++++++++++++
 .../iotdb/db/integration/IoTDBQueryDemoIT.java     |  62 +++++
 .../iotdb/db/integration/IoTDBRestartIT.java       |   9 +-
 .../dataset/groupby/GroupByFillDataSetTest.java    |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   6 +-
 .../apache/iotdb/spark/db/EnvironmentUtils.java    |   6 +-
 45 files changed, 776 insertions(+), 167 deletions(-)
 create mode 100644 server/src/test/java/org/apache/iotdb/db/integration/IoTDBInIT.java

[iotdb] 01/03: merge with master

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 936c9eb493f0d1e28b1d37173e6686316db358d4
Merge: c2f0cb6 b6538cd
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Wed Aug 11 19:03:05 2021 +0800

    merge with master

 .github/workflows/client.yml                       |  11 +-
 docs/UserGuide/Ecosystem Integration/Grafana.md    |  17 ++
 docs/zh/UserGuide/Ecosystem Integration/Grafana.md |  32 ++-
 grafana/readme.md                                  |  17 ++
 grafana/readme_zh.md                               |  23 ++
 .../resources/conf/iotdb-engine.properties         |  12 +-
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  21 +-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  36 ++-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  14 +-
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   6 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  52 ++++-
 .../db/qp/logical/crud/BasicFunctionOperator.java  |  22 +-
 .../iotdb/db/qp/logical/crud/InOperator.java       |   4 +
 .../iotdb/db/qp/logical/crud/QueryOperator.java    |   7 +-
 .../qp/strategy/optimizer/ConcatPathOptimizer.java |  20 +-
 .../java/org/apache/iotdb/db/service/IoTDB.java    |   5 +-
 .../org/apache/iotdb/db/integration/IoTDBInIT.java | 255 +++++++++++++++++++++
 .../iotdb/db/integration/IoTDBQueryDemoIT.java     |  62 +++++
 .../iotdb/db/integration/IoTDBRestartIT.java       |   9 +-
 .../dataset/groupby/GroupByFillDataSetTest.java    |   2 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |   6 +-
 .../apache/iotdb/spark/db/EnvironmentUtils.java    |   6 +-
 23 files changed, 578 insertions(+), 62 deletions(-)


[iotdb] 02/03: try to fix tests

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9f71122d8adad47c5ebdfe1ed35a2c0acfcadb61
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Aug 12 00:11:26 2021 +0800

    try to fix tests
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     | 27 ++++++++++-----
 .../iotdb/cluster/coordinator/Coordinator.java     | 13 +++++---
 .../apache/iotdb/cluster/log/LogDispatcher.java    | 11 +++---
 .../log/manage/PartitionedSnapshotLogManager.java  |  6 ++--
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 23 +++----------
 .../apache/iotdb/cluster/metadata/CMManager.java   |  3 ++
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 15 ++++++++-
 .../cluster/query/fill/ClusterPreviousFill.java    |  4 ++-
 .../query/last/ClusterLastQueryExecutor.java       |  5 +--
 .../iotdb/cluster/server/ClusterRPCService.java    | 26 +++++----------
 .../cluster/server/PullSnapshotHintService.java    |  4 +--
 .../cluster/server/member/DataGroupMember.java     | 16 +++++----
 .../cluster/server/member/MetaGroupMember.java     |  2 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 21 +++++-------
 .../cluster/server/raft/AbstractRaftService.java   |  4 +--
 .../server/service/DataGroupServiceImpls.java      | 39 ++++++++++++++++++++--
 .../cluster/server/service/MetaAsyncService.java   | 31 +++++++++++++----
 .../cluster/integration/BaseSingleNodeTest.java    | 15 +++++----
 .../cluster/server/member/MetaGroupMemberTest.java |  3 +-
 .../db/concurrent/IoTDBThreadPoolFactory.java      | 28 ++++++++++++++++
 .../org/apache/iotdb/db/service/ServiceType.java   |  1 +
 21 files changed, 192 insertions(+), 105 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 332c7e1..5913b80 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
 import org.apache.iotdb.cluster.server.HardLinkCleaner;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
@@ -47,16 +48,17 @@ import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 import org.apache.iotdb.cluster.server.service.MetaSyncService;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfigCheck;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.RegisterManager;
 import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 import org.apache.iotdb.db.utils.TestOnly;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.TAsyncClientManager;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -68,7 +70,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -113,13 +114,13 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
    * of all raft members in this node
    */
   private ScheduledExecutorService reportThread =
-      Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "NodeReportThread"));
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
 
   private boolean allowReport = true;
 
   /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
   private ScheduledExecutorService hardLinkCleanerThread =
-      Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "HardLinkCleaner"));
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
 
   // currently, dataClientProvider is only used for those instances who do not belong to any
   // DataGroup..
@@ -127,6 +128,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
   private DataClientProvider dataClientProvider;
 
   private ClusterIoTDB() {
+    // we do not init anything here, so that we can re-initialize the instance in IT.
+  }
+
+  public void initLocalEngines() {
     ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
     thisNode = new Node();
     // set internal rpc ip and ports
@@ -137,9 +142,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     thisNode.setClientPort(config.getClusterRpcPort());
     thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
     coordinator = new Coordinator();
-  }
-
-  public void initLocalEngines() {
     // local engine
     TProtocolFactory protocolFactory =
         ThriftServiceThread.getProtocolFactory(
@@ -256,6 +258,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       // TODO fixme it is better to remove coordinator out of metaGroupEngine
 
       registerManager.register(metaGroupEngine);
+      registerManager.register(dataGroupEngine);
 
       // rpc service initialize
       if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
@@ -291,8 +294,15 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       registerManager.register(ClusterMonitor.INSTANCE);
       // we must wait until the metaGroup established.
       // So that the ClusterRPCService can work.
+      ClusterTSServiceImpl clusterRPCServiceImpl = new ClusterTSServiceImpl();
+      clusterRPCServiceImpl.setCoordinator(coordinator);
+      clusterRPCServiceImpl.setExecutor(metaGroupEngine);
+      ClusterRPCService.getInstance().initSyncedServiceImpl(clusterRPCServiceImpl);
       registerManager.register(ClusterRPCService.getInstance());
-    } catch (StartupException | StartUpCheckFailureException | ConfigInconsistentException e) {
+    } catch (StartupException
+        | StartUpCheckFailureException
+        | ConfigInconsistentException
+        | QueryProcessException e) {
       logger.error("Fail to start  server", e);
       stop();
     }
@@ -497,7 +507,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
 
   private void deactivate() {
     logger.info("Deactivating Cluster IoTDB...");
-    // metaServer.stop();
     stopThreadPools();
     registerManager.deregisterAll();
     JMXService.deregisterMBean(mbeanName);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 5066908..68518b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -58,11 +58,11 @@ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,16 +92,19 @@ public class Coordinator {
       "The following errors occurred when executing "
           + "the query, please retry or contact the DBA: ";
 
+  @TestOnly
   public Coordinator(MetaGroupMember metaGroupMember) {
-    this.metaGroupMember = metaGroupMember;
-    this.name = metaGroupMember.getName();
-    this.thisNode = metaGroupMember.getThisNode();
+    linkMetaGroupMember(metaGroupMember);
   }
 
   public Coordinator() {}
 
-  public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
+  public void linkMetaGroupMember(MetaGroupMember metaGroupMember) {
     this.metaGroupMember = metaGroupMember;
+    if (metaGroupMember.getCoordinator() != null && metaGroupMember.getCoordinator() != this) {
+      logger.warn("MetadataGroupMember linked inconsistent Coordinator, will correct it.");
+      metaGroupMember.setCoordinator(this);
+    }
     this.name = metaGroupMember.getName();
     this.thisNode = metaGroupMember.getThisNode();
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a600c4a..2ce4d76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -30,11 +30,11 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.TestOnly;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -46,7 +46,6 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,13 +68,13 @@ public class LogDispatcher {
   private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
   private ExecutorService executorService;
   private static ExecutorService serializationService =
-      Executors.newFixedThreadPool(
-          Runtime.getRuntime().availableProcessors(),
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+      IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
+          Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
-    executorService = Executors.newCachedThreadPool();
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + member.getName());
     for (Node node : member.getAllNodes()) {
       if (!node.equals(member.getThisNode())) {
         nodeLogQueues.add(createQueueAndBindingThread(node));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 20535fe..c2e3c1b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +68,10 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
       Node thisNode,
       SnapshotFactory<T> factory,
       DataGroupMember dataGroupMember) {
-    super(new SyncLogDequeSerializer(header.nodeIdentifier), logApplier, header.toString());
+    super(
+        new SyncLogDequeSerializer(header.nodeIdentifier),
+        logApplier,
+        Integer.toString(header.getNodeIdentifier()));
     this.partitionTable = partitionTable;
     this.factory = factory;
     this.thisNode = thisNode;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 322b541..be05af4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -31,10 +31,10 @@ import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.StableEntryManager;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,11 +44,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 public abstract class RaftLogManager {
@@ -140,19 +138,10 @@ public abstract class RaftLogManager {
     this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
 
     this.deleteLogExecutorService =
-        new ScheduledThreadPoolExecutor(
-            1,
-            new BasicThreadFactory.Builder()
-                .namingPattern("raft-log-delete-" + name)
-                .daemon(true)
-                .build());
+        IoTDBThreadPoolFactory.newScheduledThreadPoolWithDaemon(1, "raft-log-delete-" + name);
 
     this.checkLogApplierExecutorService =
-        Executors.newSingleThreadExecutor(
-            new BasicThreadFactory.Builder()
-                .namingPattern("check-log-applier-" + name)
-                .daemon(true)
-                .build());
+        IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
 
     /** deletion check period of the submitted log */
     int logDeleteCheckIntervalSecond =
@@ -763,11 +752,7 @@ public abstract class RaftLogManager {
     this.blockAppliedCommitIndex = -1;
     this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
     this.checkLogApplierExecutorService =
-        Executors.newSingleThreadExecutor(
-            new BasicThreadFactory.Builder()
-                .namingPattern("check-log-applier-" + name)
-                .daemon(true)
-                .build());
+        IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
     this.checkLogApplierFuture = checkLogApplierExecutorService.submit(this::checkAppliedLogIndex);
     for (int i = 0; i < logUpdateConditions.length; i++) {
       logUpdateConditions[i] = new Object();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index f6b401f..b4e920c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1281,6 +1281,9 @@ public class CMManager extends MManager {
       List<PartialPath> originalPaths) {
     ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>();
     ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>();
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService getAllPathsService =
         Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size());
     for (PartialPath pathStr : originalPaths) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 9884385..78892cdf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -200,7 +200,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
     if (groupPathMap.isEmpty()) {
       return result.get();
     }
-
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService remoteQueryThreadPool = Executors.newFixedThreadPool(groupPathMap.size());
     List<Future<Void>> remoteFutures = new ArrayList<>();
     // query each data group separately
@@ -301,6 +303,10 @@ public class ClusterPlanExecutor extends PlanExecutor {
       throws MetadataException {
 
     ConcurrentSkipListSet<PartialPath> nodeSet = new ConcurrentSkipListSet<>();
+
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
 
     List<Future<Void>> futureList = new ArrayList<>();
@@ -319,6 +325,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
                 return null;
               }));
     }
+    // TODO seems there is a long-term block here.
     waitForThreadPool(futureList, pool, "getNodesList()");
     return new ArrayList<>(nodeSet);
   }
@@ -398,6 +405,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
   protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
     ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
     List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
     List<Future<Void>> futureList = new ArrayList<>();
     for (PartitionGroup group : globalGroups) {
@@ -489,6 +499,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
   @Override
   protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
     ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
 
     List<Future<Void>> futureList = new ArrayList<>();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 585b841..933d145 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -121,7 +121,9 @@ public class ClusterPreviousFill extends PreviousFill {
     }
     CountDownLatch latch = new CountDownLatch(partitionGroups.size());
     PreviousFillHandler handler = new PreviousFillHandler(latch);
-
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService fillService = Executors.newFixedThreadPool(partitionGroups.size());
     PreviousFillArguments arguments =
         new PreviousFillArguments(path, dataType, queryTime, beforeRange, deviceMeasurements);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 3ed7837..b4e9d5d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -58,7 +59,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 public class ClusterLastQueryExecutor extends LastQueryExecutor {
@@ -67,7 +67,8 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
   private MetaGroupMember metaGroupMember;
 
   private static ExecutorService lastQueryPool =
-      Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+      IoTDBThreadPoolFactory.newFixedThreadPool(
+          Runtime.getRuntime().availableProcessors(), "ClusterLastQuery");
 
   public ClusterLastQueryExecutor(LastQueryPlan lastQueryPlan, MetaGroupMember metaGroupMember) {
     super(lastQueryPlan);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index b8a2c00..fa2a2c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@ -20,12 +20,9 @@
 package org.apache.iotdb.cluster.server;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.coordinator.Coordinator;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
 import org.apache.iotdb.db.service.RPCServiceThriftHandler;
 import org.apache.iotdb.db.service.ServiceType;
@@ -50,12 +47,15 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
   }
 
   @Override
-  public void initTProcessor() throws IllegalAccessException, InstantiationException {
-    try {
-      impl = new ClusterTSServiceImpl();
-      initSyncedServiceImpl(null);
-    } catch (QueryProcessException e) {
-      throw new InstantiationException(e.getMessage());
+  public void initSyncedServiceImpl(Object serviceImpl) {
+    impl = (ClusterTSServiceImpl) serviceImpl;
+    super.initSyncedServiceImpl(serviceImpl);
+  }
+
+  @Override
+  public void initTProcessor() throws InstantiationException {
+    if (impl == null) {
+      throw new InstantiationException("ClusterTSServiceImpl is null");
     }
     processor = new Processor<>(impl);
   }
@@ -100,14 +100,6 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
     return ClusterRPCServiceHolder.INSTANCE;
   }
 
-  public void assignExecutorToServiceImpl(MetaGroupMember member) throws QueryProcessException {
-    this.impl.setExecutor(member);
-  }
-
-  public void assignCoordinator(Coordinator coordinator) {
-    this.impl.setCoordinator(coordinator);
-  }
-
   private static class ClusterRPCServiceHolder {
 
     private static final ClusterRPCService INSTANCE = new ClusterRPCService();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index c137027..ed99a45 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -56,7 +56,7 @@ public class PullSnapshotHintService {
   }
 
   public void start() {
-    this.service = Executors.newScheduledThreadPool(1);
+    this.service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "PullSnapshotHint");
     this.service.scheduleAtFixedRate(this::sendHints, 0, 10, TimeUnit.MILLISECONDS);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index aaba50f..0b1abca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -94,7 +95,6 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
-
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,7 +118,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
@@ -184,14 +183,15 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
   }
 
   DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) {
+    // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
     super(
-        "Data("
+        "Data-"
             + nodes.getHeader().getNode().getInternalIp()
-            + ":"
+            + "-"
             + nodes.getHeader().getNode().getDataPort()
-            + ", raftId="
+            + "-raftId-"
             + nodes.getId()
-            + ")",
+            + "",
         new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncDataClient.FactorySync(factory)),
         new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)),
@@ -235,7 +235,9 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
     JMXService.registerMBean(this, mbeanName);
     super.start();
     heartBeatService.submit(new DataHeartbeatThread(this));
-    pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    pullSnapshotService =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(), "pullSnapshot");
     pullSnapshotHintService = new PullSnapshotHintService(this);
     pullSnapshotHintService.start();
     resumePullSnapshotTasks();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index b3dcc7a..429459d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -96,7 +96,6 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
@@ -258,6 +257,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
 
     // try loading the partition table if there was a previous cluster
     this.coordinator = coordinator;
+    coordinator.linkMetaGroupMember(this);
     loadPartitionTable();
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d714540..3bd68ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.server.member;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
@@ -64,6 +63,7 @@ import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.IoTDBException;
@@ -81,6 +81,7 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,7 +101,6 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -289,16 +289,12 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   void startBackGroundThreads() {
-    heartBeatService =
-        Executors.newSingleThreadScheduledExecutor(
-            r -> new Thread(r, name + "-HeartbeatThread@" + System.currentTimeMillis()));
-    catchUpService =
-        Executors.newCachedThreadPool(
-            new ThreadFactoryBuilder().setNameFormat(getName() + "-CatchUpThread%d").build());
+    heartBeatService = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name + "-Heartbeat");
+
+    catchUpService = IoTDBThreadPoolFactory.newCachedThreadPool(name + "-CatchUp");
     appendLogThreadPool =
-        Executors.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors() * 10,
-            new ThreadFactoryBuilder().setNameFormat(getName() + "-AppendLog%d").build());
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors() * 10, name + "-AppendLog");
     if (!config.isUseAsyncServer()) {
       serialToParallelPool =
           IoTDBThreadPoolFactory.newThreadPool(
@@ -307,7 +303,8 @@ public abstract class RaftMember implements RaftMemberMBean {
               1000L,
               TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<>(),
-              new ThreadFactoryBuilder().setNameFormat(getName() + "-SerialToParallel%d").build());
+              new IoTThreadFactory(getName() + "-SerialToParallel"),
+              getName() + "-SerialToParallel");
     }
     commitLogPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("RaftCommitLog");
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
index d62024b..2a13550 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
@@ -19,12 +19,12 @@
 package org.apache.iotdb.cluster.server.raft;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
 import org.apache.iotdb.db.service.thrift.ThriftService;
 import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.thrift.TBaseAsyncProcessor;
 
 public abstract class AbstractRaftService extends ThriftService {
 
@@ -36,7 +36,7 @@ public abstract class AbstractRaftService extends ThriftService {
       if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
         thriftServiceThread =
             new ThriftServiceThread(
-                (TSMetaService.AsyncProcessor) processor,
+                (TBaseAsyncProcessor) processor,
                 getID().getName(),
                 clientThreadName,
                 getBindIP(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index b355766..5eec743 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -32,16 +32,39 @@ import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
-import org.apache.iotdb.cluster.rpc.thrift.*;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
+import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
+import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
+import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
+import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.StoppedMemberManager;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.utils.IOUtils;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -62,7 +85,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class DataGroupServiceImpls
-    implements TSDataService.AsyncIface, TSDataService.Iface, DataGroupServiceImplsMBean {
+    implements TSDataService.AsyncIface, TSDataService.Iface, IService, DataGroupServiceImplsMBean {
 
   private static final Logger logger = LoggerFactory.getLogger(DataGroupServiceImpls.class);
 
@@ -94,6 +117,11 @@ public class DataGroupServiceImpls
     this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory);
   }
 
+  @Override
+  public void start() throws StartupException {
+    // seems do nothing
+  }
+
   //  @Override
   // TODO
   public void stop() {
@@ -103,6 +131,11 @@ public class DataGroupServiceImpls
     }
   }
 
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CLUSTER_DATA_ENGINE;
+  }
+
   /**
    * Add a DataGroupMember into this server, if a member with the same header exists, the old member
    * will be stopped and replaced by the new one.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4f547a3..8114425 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 
 public class MetaAsyncService extends BaseAsyncService implements TSMetaService.AsyncIface {
-
+  private static final String ERROR_MSG_META_NOT_READY = "The metadata not is not ready.";
   private static final Logger logger = LoggerFactory.getLogger(MetaAsyncService.class);
 
   private MetaGroupMember metaGroupMember;
@@ -59,11 +58,19 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
 
   @Override
   public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) {
-    if (metaGroupMember.getPartitionTable() == null) {
-      // this node lacks information of the cluster and refuse to work
-      logger.debug("This node is blind to the cluster and cannot accept logs");
-      resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
-      return;
+    // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
+    // locally, but the partition table is not verified), we do not handle the RPC requests.
+    if (!metaGroupMember.isReady()) {
+      // the only special case is that the leader will send an empty entry for letting followers
+      // submit  previous log
+      // at this time, the partitionTable has been loaded but is not verified. So the PRC is not
+      // ready.
+      if (metaGroupMember.getPartitionTable() == null) {
+        // this node lacks information of the cluster and refuse to work
+        logger.debug("This node is blind to the cluster and cannot accept logs");
+        resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
+        return;
+      }
     }
 
     super.appendEntry(request, resultHandler);
@@ -72,6 +79,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
   @Override
   public void addNode(
       Node node, StartUpStatus startUpStatus, AsyncMethodCallback<AddNodeResponse> resultHandler) {
+    if (!metaGroupMember.isReady()) {
+      logger.debug(ERROR_MSG_META_NOT_READY);
+      resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY));
+      return;
+    }
     AddNodeResponse addNodeResponse = null;
     try {
       addNodeResponse = metaGroupMember.addNode(node, startUpStatus);
@@ -160,6 +172,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
 
   @Override
   public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+    if (!metaGroupMember.isReady()) {
+      logger.debug(ERROR_MSG_META_NOT_READY);
+      resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY));
+      return;
+    }
     long result;
     try {
       result = metaGroupMember.removeNode(node);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
index 541af9e..cedaf7d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
@@ -19,13 +19,12 @@
 
 package org.apache.iotdb.cluster.integration;
 
+import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.Constants;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.session.Session;
-
 import org.junit.After;
 import org.junit.Before;
 
@@ -34,7 +33,7 @@ import java.util.List;
 
 public abstract class BaseSingleNodeTest {
 
-  private MetaGroupMember metaServer;
+  private ClusterIoTDB daemon;
 
   private boolean useAsyncServer;
   private List<String> seedNodeUrls;
@@ -44,24 +43,26 @@ public abstract class BaseSingleNodeTest {
   @Before
   public void setUp() throws Exception {
     initConfigs();
-    metaServer = new MetaGroupMember();
-    metaServer.start();
-    metaServer.buildCluster();
+    daemon = ClusterIoTDB.getInstance();
+    daemon.initLocalEngines();
+    daemon.activeStartNodeMode();
   }
 
   @After
   public void tearDown() throws Exception {
     // TODO fixme
-    metaServer.stop();
+    daemon.stop();
     recoverConfigs();
     EnvironmentUtils.cleanEnv();
   }
 
   private void initConfigs() {
+    // remember the original values
     useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
     seedNodeUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls();
     replicaNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
     autoCreateSchema = ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema();
+    // set the cluster as a single node cluster.
     ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
     ClusterDescriptor.getInstance()
         .getConfig()
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 90884f2..73eb793 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -108,7 +108,6 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol.Factory;
@@ -538,7 +537,7 @@ public class MetaGroupMemberTest extends BaseMember {
             }
           }
         };
-    metaGroupMember.getCoordinator().setMetaGroupMember(metaGroupMember);
+    metaGroupMember.getCoordinator().linkMetaGroupMember(metaGroupMember);
     metaGroupMember.setLeader(node);
     metaGroupMember.setAllNodes(allNodes);
     metaGroupMember.setCharacter(NodeCharacter.LEADER);
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index 72dacc9..cdd31a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.concurrent.threadpool.WrappedScheduledExecutorService
 import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadExecutorService;
 import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadScheduledExecutor;
 import org.apache.iotdb.db.concurrent.threadpool.WrappedThreadPoolExecutor;
+
 import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +65,19 @@ public class IoTDBThreadPoolFactory {
         poolName);
   }
 
+  public static ExecutorService newFixedThreadPoolWithDaemonThread(int nThreads, String poolName) {
+    logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads);
+
+    return new WrappedThreadPoolExecutor(
+        nThreads,
+        nThreads,
+        0L,
+        TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>(),
+        new IoTDBDaemonThreadFactory(poolName),
+        poolName);
+  }
+
   public static ExecutorService newFixedThreadPool(
       int nThreads, String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads);
@@ -89,6 +103,12 @@ public class IoTDBThreadPoolFactory {
         Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)), poolName);
   }
 
+  public static ExecutorService newSingleThreadExecutorWithDaemon(String poolName) {
+    logger.info("new single thread pool: {}", poolName);
+    return new WrappedSingleThreadExecutorService(
+        Executors.newSingleThreadExecutor(new IoTDBDaemonThreadFactory(poolName)), poolName);
+  }
+
   public static ExecutorService newSingleThreadExecutor(
       String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new single thread pool: {}", poolName);
@@ -172,6 +192,14 @@ public class IoTDBThreadPoolFactory {
         Executors.newScheduledThreadPool(corePoolSize, new IoTThreadFactory(poolName)), poolName);
   }
 
+  public static ScheduledExecutorService newScheduledThreadPoolWithDaemon(
+      int corePoolSize, String poolName) {
+    logger.info("new scheduled thread pool: {}", poolName);
+    return new WrappedScheduledExecutorService(
+        Executors.newScheduledThreadPool(corePoolSize, new IoTDBDaemonThreadFactory(poolName)),
+        poolName);
+  }
+
   public static ScheduledExecutorService newScheduledThreadPool(
       int corePoolSize, String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new scheduled thread pool: {}", poolName);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 34d892c..55e99bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -64,6 +64,7 @@ public enum ServiceType {
   CLUSTER_DATA_HEART_BEAT_RPC_SERVICE(
       "Cluster Data Heartbeat RPC Service", "ClusterDataHeartbeatRPCService"),
   CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
+  CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
   ;
 
   private final String name;

[iotdb] 03/03: contine to fix tests

Posted by hx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0c13adf1f9e9af0c35686decc0d4a70c0fd506bb
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Aug 12 10:44:11 2021 +0800

    contine to fix tests
---
 cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java | 9 +++++----
 .../java/org/apache/iotdb/cluster/coordinator/Coordinator.java   | 1 +
 .../iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java  | 1 +
 .../org/apache/iotdb/cluster/server/member/DataGroupMember.java  | 1 +
 .../org/apache/iotdb/cluster/server/member/MetaGroupMember.java  | 1 +
 .../apache/iotdb/cluster/server/raft/AbstractRaftService.java    | 1 +
 .../iotdb/cluster/server/service/DataGroupServiceImpls.java      | 1 +
 .../apache/iotdb/cluster/server/service/MetaAsyncService.java    | 1 +
 .../org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java | 1 +
 .../org/apache/iotdb/cluster/integration/SingleNodeTest.java     | 8 ++------
 .../apache/iotdb/cluster/server/member/MetaGroupMemberTest.java  | 1 +
 11 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 5913b80..0201c6c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -113,14 +113,12 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
    * a single thread pool, every "REPORT_INTERVAL_SEC" seconds, "reportThread" will print the status
    * of all raft members in this node
    */
-  private ScheduledExecutorService reportThread =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
+  private ScheduledExecutorService reportThread;
 
   private boolean allowReport = true;
 
   /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
-  private ScheduledExecutorService hardLinkCleanerThread =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
+  private ScheduledExecutorService hardLinkCleanerThread;
 
   // currently, dataClientProvider is only used for those instances who do not belong to any
   // DataGroup..
@@ -159,11 +157,14 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
   }
 
   private void initTasks() {
+    reportThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
     reportThread.scheduleAtFixedRate(
         this::generateNodeReport,
         ClusterConstant.REPORT_INTERVAL_SEC,
         ClusterConstant.REPORT_INTERVAL_SEC,
         TimeUnit.SECONDS);
+    hardLinkCleanerThread =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
     hardLinkCleanerThread.scheduleAtFixedRate(
         new HardLinkCleaner(),
         ClusterConstant.CLEAN_HARDLINK_INTERVAL_SEC,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 68518b1..c4ec857 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index c2e3c1b..af39472 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 0b1abca..287f24a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -95,6 +95,7 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
+
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 429459d..3029a18 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -96,6 +96,7 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
index 2a13550..643b05c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
 import org.apache.iotdb.db.service.thrift.ThriftService;
 import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+
 import org.apache.thrift.TBaseAsyncProcessor;
 
 public abstract class AbstractRaftService extends ThriftService {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index 5eec743..ac660dd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -65,6 +65,7 @@ import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TProtocolFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 8114425..ab3a6a2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
index cedaf7d..feb010c 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.utils.Constants;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.session.Session;
+
 import org.junit.After;
 import org.junit.Before;
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
index 690dd48..f806a9d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/SingleNodeTest.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.jdbc.Config;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -36,10 +35,7 @@ import java.sql.Statement;
 import java.util.Arrays;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class SingleNodeTest extends BaseSingleNodeTest {
 
@@ -55,10 +51,10 @@ public class SingleNodeTest extends BaseSingleNodeTest {
   @Override
   @After
   public void tearDown() throws Exception {
-    super.tearDown();
     if (session != null) {
       session.close();
     }
+    super.tearDown();
   }
 
   @Test
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 73eb793..016a722 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -108,6 +108,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol.Factory;