You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/12 07:04:01 UTC

[iotdb] 02/03: try to fix tests

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9f71122d8adad47c5ebdfe1ed35a2c0acfcadb61
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Thu Aug 12 00:11:26 2021 +0800

    try to fix tests
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     | 27 ++++++++++-----
 .../iotdb/cluster/coordinator/Coordinator.java     | 13 +++++---
 .../apache/iotdb/cluster/log/LogDispatcher.java    | 11 +++---
 .../log/manage/PartitionedSnapshotLogManager.java  |  6 ++--
 .../iotdb/cluster/log/manage/RaftLogManager.java   | 23 +++----------
 .../apache/iotdb/cluster/metadata/CMManager.java   |  3 ++
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 15 ++++++++-
 .../cluster/query/fill/ClusterPreviousFill.java    |  4 ++-
 .../query/last/ClusterLastQueryExecutor.java       |  5 +--
 .../iotdb/cluster/server/ClusterRPCService.java    | 26 +++++----------
 .../cluster/server/PullSnapshotHintService.java    |  4 +--
 .../cluster/server/member/DataGroupMember.java     | 16 +++++----
 .../cluster/server/member/MetaGroupMember.java     |  2 +-
 .../iotdb/cluster/server/member/RaftMember.java    | 21 +++++-------
 .../cluster/server/raft/AbstractRaftService.java   |  4 +--
 .../server/service/DataGroupServiceImpls.java      | 39 ++++++++++++++++++++--
 .../cluster/server/service/MetaAsyncService.java   | 31 +++++++++++++----
 .../cluster/integration/BaseSingleNodeTest.java    | 15 +++++----
 .../cluster/server/member/MetaGroupMemberTest.java |  3 +-
 .../db/concurrent/IoTDBThreadPoolFactory.java      | 28 ++++++++++++++++
 .../org/apache/iotdb/db/service/ServiceType.java   |  1 +
 21 files changed, 192 insertions(+), 105 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 332c7e1..5913b80 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotStrategy;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.ClusterRPCService;
+import org.apache.iotdb.cluster.server.ClusterTSServiceImpl;
 import org.apache.iotdb.cluster.server.HardLinkCleaner;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
@@ -47,16 +48,17 @@ import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 import org.apache.iotdb.cluster.server.service.MetaSyncService;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfigCheck;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.JMXService;
 import org.apache.iotdb.db.service.RegisterManager;
 import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
 import org.apache.iotdb.db.utils.TestOnly;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.TAsyncClientManager;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
@@ -68,7 +70,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -113,13 +114,13 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
    * of all raft members in this node
    */
   private ScheduledExecutorService reportThread =
-      Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "NodeReportThread"));
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("NodeReportThread");
 
   private boolean allowReport = true;
 
   /** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
   private ScheduledExecutorService hardLinkCleanerThread =
-      Executors.newSingleThreadScheduledExecutor(n -> new Thread(n, "HardLinkCleaner"));
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("HardLinkCleaner");
 
   // currently, dataClientProvider is only used for those instances who do not belong to any
   // DataGroup..
@@ -127,6 +128,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
   private DataClientProvider dataClientProvider;
 
   private ClusterIoTDB() {
+    // we do not init anything here, so that we can re-initialize the instance in IT.
+  }
+
+  public void initLocalEngines() {
     ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
     thisNode = new Node();
     // set internal rpc ip and ports
@@ -137,9 +142,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     thisNode.setClientPort(config.getClusterRpcPort());
     thisNode.setClientIp(IoTDBDescriptor.getInstance().getConfig().getRpcAddress());
     coordinator = new Coordinator();
-  }
-
-  public void initLocalEngines() {
     // local engine
     TProtocolFactory protocolFactory =
         ThriftServiceThread.getProtocolFactory(
@@ -256,6 +258,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       // TODO fixme it is better to remove coordinator out of metaGroupEngine
 
       registerManager.register(metaGroupEngine);
+      registerManager.register(dataGroupEngine);
 
       // rpc service initialize
       if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
@@ -291,8 +294,15 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       registerManager.register(ClusterMonitor.INSTANCE);
       // we must wait until the metaGroup established.
       // So that the ClusterRPCService can work.
+      ClusterTSServiceImpl clusterRPCServiceImpl = new ClusterTSServiceImpl();
+      clusterRPCServiceImpl.setCoordinator(coordinator);
+      clusterRPCServiceImpl.setExecutor(metaGroupEngine);
+      ClusterRPCService.getInstance().initSyncedServiceImpl(clusterRPCServiceImpl);
       registerManager.register(ClusterRPCService.getInstance());
-    } catch (StartupException | StartUpCheckFailureException | ConfigInconsistentException e) {
+    } catch (StartupException
+        | StartUpCheckFailureException
+        | ConfigInconsistentException
+        | QueryProcessException e) {
       logger.error("Fail to start  server", e);
       stop();
     }
@@ -497,7 +507,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
 
   private void deactivate() {
     logger.info("Deactivating Cluster IoTDB...");
-    // metaServer.stop();
     stopThreadPools();
     registerManager.deregisterAll();
     JMXService.deregisterMBean(mbeanName);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 5066908..68518b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -58,11 +58,11 @@ import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
 import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,16 +92,19 @@ public class Coordinator {
       "The following errors occurred when executing "
           + "the query, please retry or contact the DBA: ";
 
+  @TestOnly
   public Coordinator(MetaGroupMember metaGroupMember) {
-    this.metaGroupMember = metaGroupMember;
-    this.name = metaGroupMember.getName();
-    this.thisNode = metaGroupMember.getThisNode();
+    linkMetaGroupMember(metaGroupMember);
   }
 
   public Coordinator() {}
 
-  public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
+  public void linkMetaGroupMember(MetaGroupMember metaGroupMember) {
     this.metaGroupMember = metaGroupMember;
+    if (metaGroupMember.getCoordinator() != null && metaGroupMember.getCoordinator() != this) {
+      logger.warn("MetadataGroupMember linked inconsistent Coordinator, will correct it.");
+      metaGroupMember.setCoordinator(this);
+    }
     this.name = metaGroupMember.getName();
     this.thisNode = metaGroupMember.getThisNode();
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index a600c4a..2ce4d76 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -30,11 +30,11 @@ import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.TestOnly;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -46,7 +46,6 @@ import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,13 +68,13 @@ public class LogDispatcher {
   private List<BlockingQueue<SendLogRequest>> nodeLogQueues = new ArrayList<>();
   private ExecutorService executorService;
   private static ExecutorService serializationService =
-      Executors.newFixedThreadPool(
-          Runtime.getRuntime().availableProcessors(),
-          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DispatcherEncoder-%d").build());
+      IoTDBThreadPoolFactory.newFixedThreadPoolWithDaemonThread(
+          Runtime.getRuntime().availableProcessors(), "DispatcherEncoder");
 
   public LogDispatcher(RaftMember member) {
     this.member = member;
-    executorService = Executors.newCachedThreadPool();
+    executorService =
+        IoTDBThreadPoolFactory.newCachedThreadPool("LogDispatcher-" + member.getName());
     for (Node node : member.getAllNodes()) {
       if (!node.equals(member.getThisNode())) {
         nodeLogQueues.add(createQueueAndBindingThread(node));
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
index 20535fe..c2e3c1b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/PartitionedSnapshotLogManager.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.db.metadata.mnode.IMNode;
 import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +68,10 @@ public abstract class PartitionedSnapshotLogManager<T extends Snapshot> extends
       Node thisNode,
       SnapshotFactory<T> factory,
       DataGroupMember dataGroupMember) {
-    super(new SyncLogDequeSerializer(header.nodeIdentifier), logApplier, header.toString());
+    super(
+        new SyncLogDequeSerializer(header.nodeIdentifier),
+        logApplier,
+        Integer.toString(header.getNodeIdentifier()));
     this.partitionTable = partitionTable;
     this.factory = factory;
     this.thisNode = thisNode;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index 322b541..be05af4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -31,10 +31,10 @@ import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.StableEntryManager;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.tsfile.utils.RamUsageEstimator;
 
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,11 +44,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 public abstract class RaftLogManager {
@@ -140,19 +138,10 @@ public abstract class RaftLogManager {
     this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
 
     this.deleteLogExecutorService =
-        new ScheduledThreadPoolExecutor(
-            1,
-            new BasicThreadFactory.Builder()
-                .namingPattern("raft-log-delete-" + name)
-                .daemon(true)
-                .build());
+        IoTDBThreadPoolFactory.newScheduledThreadPoolWithDaemon(1, "raft-log-delete-" + name);
 
     this.checkLogApplierExecutorService =
-        Executors.newSingleThreadExecutor(
-            new BasicThreadFactory.Builder()
-                .namingPattern("check-log-applier-" + name)
-                .daemon(true)
-                .build());
+        IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
 
     /** deletion check period of the submitted log */
     int logDeleteCheckIntervalSecond =
@@ -763,11 +752,7 @@ public abstract class RaftLogManager {
     this.blockAppliedCommitIndex = -1;
     this.blockedUnappliedLogList = new CopyOnWriteArrayList<>();
     this.checkLogApplierExecutorService =
-        Executors.newSingleThreadExecutor(
-            new BasicThreadFactory.Builder()
-                .namingPattern("check-log-applier-" + name)
-                .daemon(true)
-                .build());
+        IoTDBThreadPoolFactory.newSingleThreadExecutorWithDaemon("check-log-applier-" + name);
     this.checkLogApplierFuture = checkLogApplierExecutorService.submit(this::checkAppliedLogIndex);
     for (int i = 0; i < logUpdateConditions.length; i++) {
       logUpdateConditions[i] = new Object();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index f6b401f..b4e920c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -1281,6 +1281,9 @@ public class CMManager extends MManager {
       List<PartialPath> originalPaths) {
     ConcurrentSkipListSet<PartialPath> fullPaths = new ConcurrentSkipListSet<>();
     ConcurrentSkipListSet<PartialPath> nonExistPaths = new ConcurrentSkipListSet<>();
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService getAllPathsService =
         Executors.newFixedThreadPool(metaGroupMember.getPartitionTable().getGlobalGroups().size());
     for (PartialPath pathStr : originalPaths) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 9884385..78892cdf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -200,7 +200,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
     if (groupPathMap.isEmpty()) {
       return result.get();
     }
-
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService remoteQueryThreadPool = Executors.newFixedThreadPool(groupPathMap.size());
     List<Future<Void>> remoteFutures = new ArrayList<>();
     // query each data group separately
@@ -301,6 +303,10 @@ public class ClusterPlanExecutor extends PlanExecutor {
       throws MetadataException {
 
     ConcurrentSkipListSet<PartialPath> nodeSet = new ConcurrentSkipListSet<>();
+
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
 
     List<Future<Void>> futureList = new ArrayList<>();
@@ -319,6 +325,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
                 return null;
               }));
     }
+    // TODO seems there is a long-term block here.
     waitForThreadPool(futureList, pool, "getNodesList()");
     return new ArrayList<>(nodeSet);
   }
@@ -398,6 +405,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
   protected Set<String> getNodeNextChildren(PartialPath path) throws MetadataException {
     ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
     List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
     List<Future<Void>> futureList = new ArrayList<>();
     for (PartitionGroup group : globalGroups) {
@@ -489,6 +499,9 @@ public class ClusterPlanExecutor extends PlanExecutor {
   @Override
   protected Set<String> getPathNextChildren(PartialPath path) throws MetadataException {
     ConcurrentSkipListSet<String> resultSet = new ConcurrentSkipListSet<>();
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService pool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
 
     List<Future<Void>> futureList = new ArrayList<>();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 585b841..933d145 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -121,7 +121,9 @@ public class ClusterPreviousFill extends PreviousFill {
     }
     CountDownLatch latch = new CountDownLatch(partitionGroups.size());
     PreviousFillHandler handler = new PreviousFillHandler(latch);
-
+    // TODO it is not suitable for register and deregister an Object to JMX to such a frequent
+    // function call.
+    // BUT is it suitable to create a thread pool for each calling??
     ExecutorService fillService = Executors.newFixedThreadPool(partitionGroups.size());
     PreviousFillArguments arguments =
         new PreviousFillArguments(path, dataType, queryTime, beforeRange, deviceMeasurements);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 3ed7837..b4e9d5d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.PartialPath;
@@ -58,7 +59,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 public class ClusterLastQueryExecutor extends LastQueryExecutor {
@@ -67,7 +67,8 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
   private MetaGroupMember metaGroupMember;
 
   private static ExecutorService lastQueryPool =
-      Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+      IoTDBThreadPoolFactory.newFixedThreadPool(
+          Runtime.getRuntime().availableProcessors(), "ClusterLastQuery");
 
   public ClusterLastQueryExecutor(LastQueryPlan lastQueryPlan, MetaGroupMember metaGroupMember) {
     super(lastQueryPlan);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
index b8a2c00..fa2a2c9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java
@@ -20,12 +20,9 @@
 package org.apache.iotdb.cluster.server;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.coordinator.Coordinator;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
 import org.apache.iotdb.db.service.RPCServiceThriftHandler;
 import org.apache.iotdb.db.service.ServiceType;
@@ -50,12 +47,15 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
   }
 
   @Override
-  public void initTProcessor() throws IllegalAccessException, InstantiationException {
-    try {
-      impl = new ClusterTSServiceImpl();
-      initSyncedServiceImpl(null);
-    } catch (QueryProcessException e) {
-      throw new InstantiationException(e.getMessage());
+  public void initSyncedServiceImpl(Object serviceImpl) {
+    impl = (ClusterTSServiceImpl) serviceImpl;
+    super.initSyncedServiceImpl(serviceImpl);
+  }
+
+  @Override
+  public void initTProcessor() throws InstantiationException {
+    if (impl == null) {
+      throw new InstantiationException("ClusterTSServiceImpl is null");
     }
     processor = new Processor<>(impl);
   }
@@ -100,14 +100,6 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic
     return ClusterRPCServiceHolder.INSTANCE;
   }
 
-  public void assignExecutorToServiceImpl(MetaGroupMember member) throws QueryProcessException {
-    this.impl.setExecutor(member);
-  }
-
-  public void assignCoordinator(Coordinator coordinator) {
-    this.impl.setCoordinator(coordinator);
-  }
-
   private static class ClusterRPCServiceHolder {
 
     private static final ClusterRPCService INSTANCE = new ClusterRPCService();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index c137027..ed99a45 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -36,7 +37,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -56,7 +56,7 @@ public class PullSnapshotHintService {
   }
 
   public void start() {
-    this.service = Executors.newScheduledThreadPool(1);
+    this.service = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "PullSnapshotHint");
     this.service.scheduleAtFixedRate(this::sendHints, 0, 10, TimeUnit.MILLISECONDS);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index aaba50f..0b1abca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -94,7 +95,6 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.utils.Pair;
-
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,7 +118,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iotdb.cluster.config.ClusterConstant.THREAD_POLL_WAIT_TERMINATION_TIME_S;
@@ -184,14 +183,15 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
   }
 
   DataGroupMember(TProtocolFactory factory, PartitionGroup nodes, MetaGroupMember metaGroupMember) {
+    // The name is used in JMX, so we have to avoid to use "(" "," "=" ")"
     super(
-        "Data("
+        "Data-"
             + nodes.getHeader().getNode().getInternalIp()
-            + ":"
+            + "-"
             + nodes.getHeader().getNode().getDataPort()
-            + ", raftId="
+            + "-raftId-"
             + nodes.getId()
-            + ")",
+            + "",
         new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncDataClient.FactorySync(factory)),
         new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)),
@@ -235,7 +235,9 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
     JMXService.registerMBean(this, mbeanName);
     super.start();
     heartBeatService.submit(new DataHeartbeatThread(this));
-    pullSnapshotService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+    pullSnapshotService =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(), "pullSnapshot");
     pullSnapshotHintService = new PullSnapshotHintService(this);
     pullSnapshotHintService.start();
     resumePullSnapshotTasks();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index b3dcc7a..429459d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -96,7 +96,6 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
@@ -258,6 +257,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
 
     // try loading the partition table if there was a previous cluster
     this.coordinator = coordinator;
+    coordinator.linkMetaGroupMember(this);
     loadPartitionTable();
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index d714540..3bd68ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.server.member;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.async.AsyncClientPool;
 import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
@@ -64,6 +63,7 @@ import org.apache.iotdb.cluster.utils.IOUtils;
 import org.apache.iotdb.cluster.utils.PlanSerializer;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.IoTThreadFactory;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.exception.IoTDBException;
@@ -81,6 +81,7 @@ import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,7 +101,6 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -289,16 +289,12 @@ public abstract class RaftMember implements RaftMemberMBean {
   }
 
   void startBackGroundThreads() {
-    heartBeatService =
-        Executors.newSingleThreadScheduledExecutor(
-            r -> new Thread(r, name + "-HeartbeatThread@" + System.currentTimeMillis()));
-    catchUpService =
-        Executors.newCachedThreadPool(
-            new ThreadFactoryBuilder().setNameFormat(getName() + "-CatchUpThread%d").build());
+    heartBeatService = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name + "-Heartbeat");
+
+    catchUpService = IoTDBThreadPoolFactory.newCachedThreadPool(name + "-CatchUp");
     appendLogThreadPool =
-        Executors.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors() * 10,
-            new ThreadFactoryBuilder().setNameFormat(getName() + "-AppendLog%d").build());
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors() * 10, name + "-AppendLog");
     if (!config.isUseAsyncServer()) {
       serialToParallelPool =
           IoTDBThreadPoolFactory.newThreadPool(
@@ -307,7 +303,8 @@ public abstract class RaftMember implements RaftMemberMBean {
               1000L,
               TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<>(),
-              new ThreadFactoryBuilder().setNameFormat(getName() + "-SerialToParallel%d").build());
+              new IoTThreadFactory(getName() + "-SerialToParallel"),
+              getName() + "-SerialToParallel");
     }
     commitLogPool = IoTDBThreadPoolFactory.newSingleThreadExecutor("RaftCommitLog");
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
index d62024b..2a13550 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/raft/AbstractRaftService.java
@@ -19,12 +19,12 @@
 package org.apache.iotdb.cluster.server.raft;
 
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.runtime.RPCServiceException;
 import org.apache.iotdb.db.service.thrift.ThriftService;
 import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
+import org.apache.thrift.TBaseAsyncProcessor;
 
 public abstract class AbstractRaftService extends ThriftService {
 
@@ -36,7 +36,7 @@ public abstract class AbstractRaftService extends ThriftService {
       if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
         thriftServiceThread =
             new ThriftServiceThread(
-                (TSMetaService.AsyncProcessor) processor,
+                (TBaseAsyncProcessor) processor,
                 getID().getName(),
                 clientThreadName,
                 getBindIP(),
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
index b355766..5eec743 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataGroupServiceImpls.java
@@ -32,16 +32,39 @@ import org.apache.iotdb.cluster.partition.NodeRemovalResult;
 import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
-import org.apache.iotdb.cluster.rpc.thrift.*;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
+import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
+import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
+import org.apache.iotdb.cluster.rpc.thrift.GetAggrResultRequest;
+import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
+import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
+import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
+import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
+import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
+import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
+import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
+import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.StoppedMemberManager;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.utils.IOUtils;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TProtocolFactory;
@@ -62,7 +85,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class DataGroupServiceImpls
-    implements TSDataService.AsyncIface, TSDataService.Iface, DataGroupServiceImplsMBean {
+    implements TSDataService.AsyncIface, TSDataService.Iface, IService, DataGroupServiceImplsMBean {
 
   private static final Logger logger = LoggerFactory.getLogger(DataGroupServiceImpls.class);
 
@@ -94,6 +117,11 @@ public class DataGroupServiceImpls
     this.stoppedMemberManager = new StoppedMemberManager(dataMemberFactory);
   }
 
+  @Override
+  public void start() throws StartupException {
+    // seems do nothing
+  }
+
   //  @Override
   // TODO
   public void stop() {
@@ -103,6 +131,11 @@ public class DataGroupServiceImpls
     }
   }
 
+  @Override
+  public ServiceType getID() {
+    return ServiceType.CLUSTER_DATA_ENGINE;
+  }
+
   /**
    * Add a DataGroupMember into this server, if a member with the same header exists, the old member
    * will be stopped and replaced by the new one.
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4f547a3..8114425 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
@@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 
 public class MetaAsyncService extends BaseAsyncService implements TSMetaService.AsyncIface {
-
+  private static final String ERROR_MSG_META_NOT_READY = "The metadata not is not ready.";
   private static final Logger logger = LoggerFactory.getLogger(MetaAsyncService.class);
 
   private MetaGroupMember metaGroupMember;
@@ -59,11 +58,19 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
 
   @Override
   public void appendEntry(AppendEntryRequest request, AsyncMethodCallback resultHandler) {
-    if (metaGroupMember.getPartitionTable() == null) {
-      // this node lacks information of the cluster and refuse to work
-      logger.debug("This node is blind to the cluster and cannot accept logs");
-      resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
-      return;
+    // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
+    // locally, but the partition table is not verified), we do not handle the RPC requests.
+    if (!metaGroupMember.isReady()) {
+      // the only special case is that the leader will send an empty entry for letting followers
+      // submit  previous log
+      // at this time, the partitionTable has been loaded but is not verified. So the PRC is not
+      // ready.
+      if (metaGroupMember.getPartitionTable() == null) {
+        // this node lacks information of the cluster and refuse to work
+        logger.debug("This node is blind to the cluster and cannot accept logs");
+        resultHandler.onComplete(Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE);
+        return;
+      }
     }
 
     super.appendEntry(request, resultHandler);
@@ -72,6 +79,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
   @Override
   public void addNode(
       Node node, StartUpStatus startUpStatus, AsyncMethodCallback<AddNodeResponse> resultHandler) {
+    if (!metaGroupMember.isReady()) {
+      logger.debug(ERROR_MSG_META_NOT_READY);
+      resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY));
+      return;
+    }
     AddNodeResponse addNodeResponse = null;
     try {
       addNodeResponse = metaGroupMember.addNode(node, startUpStatus);
@@ -160,6 +172,11 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
 
   @Override
   public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) {
+    if (!metaGroupMember.isReady()) {
+      logger.debug(ERROR_MSG_META_NOT_READY);
+      resultHandler.onError(new TException(ERROR_MSG_META_NOT_READY));
+      return;
+    }
     long result;
     try {
       result = metaGroupMember.removeNode(node);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
index 541af9e..cedaf7d 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/integration/BaseSingleNodeTest.java
@@ -19,13 +19,12 @@
 
 package org.apache.iotdb.cluster.integration;
 
+import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.Constants;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.session.Session;
-
 import org.junit.After;
 import org.junit.Before;
 
@@ -34,7 +33,7 @@ import java.util.List;
 
 public abstract class BaseSingleNodeTest {
 
-  private MetaGroupMember metaServer;
+  private ClusterIoTDB daemon;
 
   private boolean useAsyncServer;
   private List<String> seedNodeUrls;
@@ -44,24 +43,26 @@ public abstract class BaseSingleNodeTest {
   @Before
   public void setUp() throws Exception {
     initConfigs();
-    metaServer = new MetaGroupMember();
-    metaServer.start();
-    metaServer.buildCluster();
+    daemon = ClusterIoTDB.getInstance();
+    daemon.initLocalEngines();
+    daemon.activeStartNodeMode();
   }
 
   @After
   public void tearDown() throws Exception {
     // TODO fixme
-    metaServer.stop();
+    daemon.stop();
     recoverConfigs();
     EnvironmentUtils.cleanEnv();
   }
 
   private void initConfigs() {
+    // remember the original values
     useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
     seedNodeUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls();
     replicaNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum();
     autoCreateSchema = ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema();
+    // set the cluster as a single node cluster.
     ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
     ClusterDescriptor.getInstance()
         .getConfig()
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 90884f2..73eb793 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -108,7 +108,6 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
-
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol.Factory;
@@ -538,7 +537,7 @@ public class MetaGroupMemberTest extends BaseMember {
             }
           }
         };
-    metaGroupMember.getCoordinator().setMetaGroupMember(metaGroupMember);
+    metaGroupMember.getCoordinator().linkMetaGroupMember(metaGroupMember);
     metaGroupMember.setLeader(node);
     metaGroupMember.setAllNodes(allNodes);
     metaGroupMember.setCharacter(NodeCharacter.LEADER);
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
index 72dacc9..cdd31a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/IoTDBThreadPoolFactory.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.db.concurrent.threadpool.WrappedScheduledExecutorService
 import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadExecutorService;
 import org.apache.iotdb.db.concurrent.threadpool.WrappedSingleThreadScheduledExecutor;
 import org.apache.iotdb.db.concurrent.threadpool.WrappedThreadPoolExecutor;
+
 import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +65,19 @@ public class IoTDBThreadPoolFactory {
         poolName);
   }
 
+  public static ExecutorService newFixedThreadPoolWithDaemonThread(int nThreads, String poolName) {
+    logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads);
+
+    return new WrappedThreadPoolExecutor(
+        nThreads,
+        nThreads,
+        0L,
+        TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<>(),
+        new IoTDBDaemonThreadFactory(poolName),
+        poolName);
+  }
+
   public static ExecutorService newFixedThreadPool(
       int nThreads, String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new fixed thread pool: {}, thread number: {}", poolName, nThreads);
@@ -89,6 +103,12 @@ public class IoTDBThreadPoolFactory {
         Executors.newSingleThreadExecutor(new IoTThreadFactory(poolName)), poolName);
   }
 
+  public static ExecutorService newSingleThreadExecutorWithDaemon(String poolName) {
+    logger.info("new single thread pool: {}", poolName);
+    return new WrappedSingleThreadExecutorService(
+        Executors.newSingleThreadExecutor(new IoTDBDaemonThreadFactory(poolName)), poolName);
+  }
+
   public static ExecutorService newSingleThreadExecutor(
       String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new single thread pool: {}", poolName);
@@ -172,6 +192,14 @@ public class IoTDBThreadPoolFactory {
         Executors.newScheduledThreadPool(corePoolSize, new IoTThreadFactory(poolName)), poolName);
   }
 
+  public static ScheduledExecutorService newScheduledThreadPoolWithDaemon(
+      int corePoolSize, String poolName) {
+    logger.info("new scheduled thread pool: {}", poolName);
+    return new WrappedScheduledExecutorService(
+        Executors.newScheduledThreadPool(corePoolSize, new IoTDBDaemonThreadFactory(poolName)),
+        poolName);
+  }
+
   public static ScheduledExecutorService newScheduledThreadPool(
       int corePoolSize, String poolName, Thread.UncaughtExceptionHandler handler) {
     logger.info("new scheduled thread pool: {}", poolName);
diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 34d892c..55e99bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@ -64,6 +64,7 @@ public enum ServiceType {
   CLUSTER_DATA_HEART_BEAT_RPC_SERVICE(
       "Cluster Data Heartbeat RPC Service", "ClusterDataHeartbeatRPCService"),
   CLUSTER_META_ENGINE("Cluster Meta Engine", "ClusterMetaEngine"),
+  CLUSTER_DATA_ENGINE("Cluster Data Engine", "ClusterDataEngine"),
   ;
 
   private final String name;