You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/10 16:46:51 UTC

[iotdb] 05/07: almost pass

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4246406fab87631394c5e96003187d6f1ef13e96
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Tue Aug 10 09:51:04 2021 +0800

    almost pass
---
 .../org/apache/iotdb/cluster/ClusterIoTDB.java     | 14 ++--
 .../cluster/client/async/AsyncClientPool.java      |  5 +-
 .../iotdb/cluster/client/sync/SyncClientPool.java  | 10 ++-
 .../cluster/log/snapshot/MetaSimpleSnapshot.java   |  2 +-
 .../org/apache/iotdb/cluster/server/Response.java  |  3 +
 .../server/clusterinfo/ClusterInfoServer.java      |  1 +
 .../server/heartbeat/MetaHeartbeatThread.java      |  4 +-
 .../cluster/server/member/MetaGroupMember.java     | 79 ++++++++++++++++------
 .../cluster/server/service/MetaSyncService.java    | 32 +++++++--
 .../cluster/log/applier/DataLogApplierTest.java    |  3 +-
 .../log/snapshot/MetaSimpleSnapshotTest.java       |  2 +-
 .../cluster/query/reader/DatasourceInfoTest.java   |  1 +
 .../reader/RemoteSeriesReaderByTimestampTest.java  |  1 +
 .../query/reader/RemoteSimpleSeriesReaderTest.java |  1 +
 .../mult/AssignPathManagedMergeReaderTest.java     |  5 +-
 .../reader/mult/RemoteMultSeriesReaderTest.java    |  5 +-
 .../iotdb/cluster/server/member/BaseMember.java    |  1 +
 .../cluster/server/member/MetaGroupMemberTest.java |  1 +
 .../resources/node1conf/iotdb-engine.properties    | 12 ++--
 .../resources/node2conf/iotdb-engine.properties    | 12 ++--
 .../resources/node3conf/iotdb-engine.properties    | 12 ++--
 21 files changed, 148 insertions(+), 58 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 07aee1e..e72b5f0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -238,6 +238,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
 
   public void activeStartNodeMode() {
     try {
+      stopRaftInfoReport();
+
       startServerCheck();
       preStartCustomize();
 
@@ -249,8 +251,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
 
       registerManager.register(metaGroupEngine);
 
-      metaGroupEngine.buildCluster();
-
       // rpc service initialize
       if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
         MetaAsyncService metaAsyncService = new MetaAsyncService(metaGroupEngine);
@@ -265,11 +265,16 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
         DataRaftService.getInstance().initSyncedServiceImpl(dataGroupEngine);
         DataRaftHeartBeatService.getInstance().initSyncedServiceImpl(dataGroupEngine);
       }
-
       // start RPC service
+      logger.info("start Meta Heartbeat RPC service... ");
       registerManager.register(MetaRaftHeartBeatService.getInstance());
+      logger.info("start Meta RPC service... ");
       registerManager.register(MetaRaftService.getInstance());
+
+      metaGroupEngine.buildCluster();
+      logger.info("start Data Heartbeat RPC service... ");
       registerManager.register(DataRaftHeartBeatService.getInstance());
+      logger.info("start Data RPC service... ");
       registerManager.register(DataRaftService.getInstance());
       // RPC based DBA API
       registerManager.register(ClusterInfoServer.getInstance());
@@ -279,8 +284,8 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
       // So that the ClusterRPCService can work.
       registerManager.register(ClusterRPCService.getInstance());
     } catch (StartupException | StartUpCheckFailureException | ConfigInconsistentException e) {
+      logger.error("Fail to start  server", e);
       stop();
-      logger.error("Fail to start meta server", e);
     }
   }
 
@@ -291,6 +296,7 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
     //      preStartCustomize();
     //      metaServer.start();
     //      metaServer.joinCluster();
+    //      dataEngine.pullSnapshots();
     //      // Currently, we do not register ClusterInfoService as a JMX Bean,
     //      // so we use startService() rather than start()
     //      ClusterInfoServer.getInstance().startService();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index bf0370f..719ab8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 import org.apache.iotdb.db.utils.TestOnly;
-
 import org.apache.thrift.async.TAsyncMethodCall;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +44,10 @@ public class AsyncClientPool {
   private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
   private AsyncClientFactory asyncClientFactory;
 
+  // TODO fix me: better to throw exception if the client can not be get. Then we can remove this
+  // field.
+  public static boolean printStack;
+
   public AsyncClientPool(AsyncClientFactory asyncClientFactory) {
     this.asyncClientFactory = asyncClientFactory;
     this.waitClientTimeutMS = ClusterDescriptor.getInstance().getConfig().getWaitClientTimeoutMS();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 2c279c0..c6466f4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 import org.apache.iotdb.db.utils.TestOnly;
-
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +43,10 @@ public class SyncClientPool {
   private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
   private SyncClientFactory syncClientFactory;
 
+  // TODO fix me: better to throw exception if the client can not be get. Then we can remove this
+  // field.
+  public static boolean printStack = false;
+
   public SyncClientPool(SyncClientFactory syncClientFactory) {
     this.syncClientFactory = syncClientFactory;
     this.waitClientTimeoutMS = ClusterDescriptor.getInstance().getConfig().getWaitClientTimeoutMS();
@@ -90,7 +93,10 @@ public class SyncClientPool {
           try {
             client = syncClientFactory.getSyncClient(clusterNode, this);
           } catch (TTransportException e) {
-            logger.error("Cannot open transport for client {}", node, e);
+            // TODO throw me is better.
+            if (printStack) {
+              logger.error("Cannot open transport for client {}", node, e);
+            }
             return null;
           }
           nodeClientNumMap.compute(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
index 1713426..e982d74 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshot.java
@@ -232,7 +232,7 @@ public class MetaSimpleSnapshot extends Snapshot {
         }
 
         // 4. accept partition table
-        metaGroupMember.acceptPartitionTable(snapshot.getPartitionTableBuffer(), true);
+        metaGroupMember.acceptVerifiedPartitionTable(snapshot.getPartitionTableBuffer(), true);
 
         synchronized (metaGroupMember.getLogManager()) {
           metaGroupMember.getLogManager().applySnapshot(snapshot);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
index 006eec1..387549d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/Response.java
@@ -53,6 +53,9 @@ public class Response {
   // the request is not executed locally anc should be forwarded
   public static final long RESPONSE_NULL = Long.MIN_VALUE;
 
+  // the meta engine is not ready (except for the partitionTable is ready)
+  public static final long RESPONSE_META_NOT_READY = -12;
+
   private Response() {
     // enum-like class
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServer.java
index bf08e7d..39619b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/clusterinfo/ClusterInfoServer.java
@@ -49,6 +49,7 @@ public class ClusterInfoServer extends ThriftService implements ClusterInfoServe
 
   @Override
   public void initTProcessor() {
+    initSyncedServiceImpl(null);
     serviceImpl = new ClusterInfoServiceImpl();
     processor = new Processor<>(serviceImpl);
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
index 4036244..137330a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThread.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.cluster.server.heartbeat;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,6 +84,9 @@ public class MetaHeartbeatThread extends HeartbeatThread {
       localMetaMember
           .getAppendLogThreadPool()
           .submit(() -> localMetaMember.processEmptyContentLog());
+      // this is a risk that (1) put a task into a pool
+      // and (2) the task puts more sub-tasks into the same pool, especially the task can only
+      // terminal when all sub-tasks finish.
     }
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7b66e4d..53fbfc8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -95,7 +95,6 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransportException;
@@ -211,6 +210,17 @@ public class MetaGroupMember extends RaftMember implements IService {
     return router;
   }
 
+  public boolean isReady() {
+    return ready;
+  }
+
+  public void setReady(boolean ready) {
+    this.ready = ready;
+  }
+
+  // whether the MetaEngine has been ready.
+  boolean ready = false;
+
   @TestOnly
   public MetaGroupMember() {}
 
@@ -397,8 +407,12 @@ public class MetaGroupMember extends RaftMember implements IService {
       initIdNodeMap();
       router = new ClusterPlanRouter(partitionTable);
       this.coordinator.setRouter(router);
-      startSubServers();
+      rebuildDataGroups();
+      ready = true;
     }
+    // else, we have to wait the meta group elects the Leader, and let the leader confirm the
+    // correct PartitionTable.
+    // then we can set the meta group Engine ready.
   }
 
   private void threadTaskInit() {
@@ -502,8 +516,9 @@ public class MetaGroupMember extends RaftMember implements IService {
     } else if (resp.getRespNum() == Response.RESPONSE_AGREE) {
       logger.info("Node {} admitted this node into the cluster", node);
       ByteBuffer partitionTableBuffer = resp.partitionTableBytes;
-      acceptPartitionTable(partitionTableBuffer, true);
-      getDataGroupEngine().pullSnapshots();
+      acceptVerifiedPartitionTable(partitionTableBuffer, true);
+      // this should be called in ClusterIoTDB TODO
+      // getDataGroupEngine().pullSnapshots();
       return true;
     } else if (resp.getRespNum() == Response.RESPONSE_IDENTIFIER_CONFLICT) {
       logger.info(
@@ -552,9 +567,11 @@ public class MetaGroupMember extends RaftMember implements IService {
   }
 
   /**
-   * Process the heartbeat request from a valid leader. Generate and tell the leader the identifier
-   * of the node if necessary. If the partition table is missing, use the one from the request or
-   * require it in the response.
+   * This is the behavior of a follower:
+   *
+   * <p>Process the heartbeat request from a valid leader. Generate and tell the leader the
+   * identifier of the node if necessary. If the partition table is missing, use the one from the
+   * request or require it in the response. TODO should go to RPC Service
    */
   @Override
   void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {
@@ -575,7 +592,7 @@ public class MetaGroupMember extends RaftMember implements IService {
           // if the leader has sent the partition table then accept it
           if (partitionTable == null) {
             ByteBuffer byteBuffer = request.partitionTableBytes;
-            acceptPartitionTable(byteBuffer, true);
+            acceptVerifiedPartitionTable(byteBuffer, true);
           }
         }
       } else {
@@ -590,10 +607,11 @@ public class MetaGroupMember extends RaftMember implements IService {
    * Deserialize a partition table from the buffer, save it locally, add nodes from the partition
    * table and start DataClusterServer and ClusterTSServiceImpl.
    */
-  public synchronized void acceptPartitionTable(
+  protected synchronized void acceptPartitionTable(
       ByteBuffer partitionTableBuffer, boolean needSerialization) {
     SlotPartitionTable newTable = new SlotPartitionTable(thisNode);
     newTable.deserialize(partitionTableBuffer);
+
     // avoid overwriting current partition table with a previous one
     if (partitionTable != null) {
       long currIndex = partitionTable.getLastMetaLogIndex();
@@ -618,7 +636,20 @@ public class MetaGroupMember extends RaftMember implements IService {
 
     updateNodeList(newTable.getAllNodes());
 
-    startSubServers();
+    // we can not start the data group engine here,
+    // because the partitionTable is not verified.
+    // TODO
+    //    restartSubServers();
+  }
+
+  // this is the behavior of the follower
+  public synchronized void acceptVerifiedPartitionTable(
+      ByteBuffer partitionTableBuffer, boolean needSerialization) {
+    logger.info("new Partition Table is received.");
+    acceptPartitionTable(partitionTableBuffer, needSerialization);
+    rebuildDataGroups();
+    logger.info("The Meta Engine is ready");
+    ready = true;
   }
 
   private void updateNodeList(Collection<Node> nodes) {
@@ -632,11 +663,13 @@ public class MetaGroupMember extends RaftMember implements IService {
   }
 
   /**
-   * Process a HeartBeatResponse from a follower. If the follower has provided its identifier, try
-   * registering for it and if all nodes have registered and there is no available partition table,
-   * initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the follower
-   * requires a partition table, add it to the blind node list so that at the next heartbeat this
-   * node will send it a partition table
+   * This is the behavior of the Leader:
+   *
+   * <p>Process a HeartBeatResponse from a follower. If the follower has provided its identifier,
+   * try registering for it and if all nodes have registered and there is no available partition
+   * table, initialize a new one and start the ClusterTSServiceImpl and DataClusterServer. If the
+   * follower requires a partition table, add it to the blind node list so that at the next
+   * heartbeat this node will send it a partition table
    */
   @Override
   public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {
@@ -647,6 +680,8 @@ public class MetaGroupMember extends RaftMember implements IService {
       registerNodeIdentifier(response.getFollower(), response.getFollowerIdentifier());
       // if all nodes' ids are known, we can build the partition table
       if (allNodesIdKnown()) {
+        // Notice that this should only be called once.
+
         // When the meta raft group is established, the follower reports its node information to the
         // leader through the first heartbeat. After the leader knows the node information of all
         // nodes, it can replace the incomplete node information previously saved locally, and build
@@ -658,7 +693,9 @@ public class MetaGroupMember extends RaftMember implements IService {
         }
         router = new ClusterPlanRouter(partitionTable);
         this.coordinator.setRouter(router);
-        startSubServers();
+        rebuildDataGroups();
+        logger.info("The Meta Engine is ready");
+        this.ready = true;
       }
     }
     // record the requirement of partition table of the follower
@@ -672,7 +709,7 @@ public class MetaGroupMember extends RaftMember implements IService {
    * the next heartbeat the partition table will be sent to the node.
    */
   private void addBlindNode(Node node) {
-    logger.debug("Node {} requires the node list", node);
+    logger.debug("Node {} requires the node list (partition table)", node);
     blindNodes.add(node);
   }
 
@@ -722,7 +759,7 @@ public class MetaGroupMember extends RaftMember implements IService {
    * Start the DataClusterServer and ClusterTSServiceImpl` so this node can serve other nodes and
    * clients. Also build DataGroupMembers using the partition table.
    */
-  protected synchronized void startSubServers() {
+  protected synchronized void rebuildDataGroups() {
     logger.info("Starting sub-servers...");
     synchronized (partitionTable) {
       try {
@@ -739,7 +776,7 @@ public class MetaGroupMember extends RaftMember implements IService {
   }
 
   /** When the node restarts, it sends handshakes to all other nodes so they may know it is back. */
-  private void sendHandshake() {
+  public void sendHandshake() {
     for (Node node : allNodes) {
       if (ClusterUtils.nodeEqual(node, thisNode)) {
         // no need to shake hands with yourself
@@ -1058,7 +1095,7 @@ public class MetaGroupMember extends RaftMember implements IService {
       inconsistentNum.set(0);
       checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
       logger.debug(
-          "Status check result: {}-{}/{}",
+          "Status check result: consistent nodes: {}, inconsistent nodes: {}, total nodes: {}",
           consistentNum.get(),
           inconsistentNum.get(),
           getAllNodes().size());
@@ -1078,6 +1115,7 @@ public class MetaGroupMember extends RaftMember implements IService {
         }
       }
     }
+    // after checking, we enable print the error stack in 'SyncClientPool.getClient'
   }
 
   // TODO rewrite this method.
@@ -1194,6 +1232,7 @@ public class MetaGroupMember extends RaftMember implements IService {
       }
 
       ByteBuffer wrap = ByteBuffer.wrap(tableBuffer);
+      logger.info("Load Partition Table locally.");
       acceptPartitionTable(wrap, false);
 
       logger.info("Load {} nodes: {}", allNodes.size(), allNodes);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index d0fe5d1..065b663 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
-
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,18 +59,33 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
 
   @Override
   public long appendEntry(AppendEntryRequest request) throws TException {
-    if (metaGroupMember.getPartitionTable() == null) {
-      // this node lacks information of the cluster and refuse to work
-      logger.debug("This node is blind to the cluster and cannot accept logs");
-      return Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE;
+    // if the metaGroupMember is not ready (e.g., as a follower the PartitionTable is loaded
+    // locally, but the partition table is not verified), we do not handle the RPC requests.
+    if (!metaGroupMember.isReady()) {
+      // the only special case is that the leader will send an empty entry for letting followers
+      // submit  previous log
+      // at this time, the partitionTable has been loaded but is not verified. So the PRC is not
+      // ready.
+      if (metaGroupMember.getPartitionTable() == null) {
+        // this node lacks information of the cluster and refuse to work
+        logger.debug("This node is blind to the cluster and cannot accept logs, {}", request);
+        return Response.RESPONSE_PARTITION_TABLE_UNAVAILABLE;
+      }
     }
 
     return super.appendEntry(request);
   }
 
+  private static final String ERROR_MSG_META_NOT_READY = "The metadata not is not ready.";
+
   @Override
   public AddNodeResponse addNode(Node node, StartUpStatus startUpStatus) throws TException {
     AddNodeResponse addNodeResponse;
+    if (!metaGroupMember.isReady()) {
+      logger.debug(ERROR_MSG_META_NOT_READY);
+      throw new TException(ERROR_MSG_META_NOT_READY);
+    }
+
     try {
       addNodeResponse = metaGroupMember.addNode(node, startUpStatus);
     } catch (AddSelfException | LogExecutionException | CheckConsistencyException e) {
@@ -98,6 +112,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
 
   @Override
   public void sendSnapshot(SendSnapshotRequest request) throws TException {
+    // even the meta engine is not ready, we still need to catch up.
     try {
       metaGroupMember.receiveSnapshot(request);
     } catch (Exception e) {
@@ -107,6 +122,7 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
 
   @Override
   public CheckStatusResponse checkStatus(StartUpStatus startUpStatus) {
+    // this method is called before the meta engine is ready.
     return ClusterUtils.checkStatus(startUpStatus, metaGroupMember.getStartUpStatus());
   }
 
@@ -149,11 +165,17 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
 
   @Override
   public ByteBuffer collectMigrationStatus() {
+    // TODO not sure whether it can happen before the meta engine is ready
     return ClusterUtils.serializeMigrationStatus(metaGroupMember.collectMigrationStatus());
   }
 
   @Override
   public long removeNode(Node node) throws TException {
+    if (!metaGroupMember.isReady()) {
+      logger.debug(ERROR_MSG_META_NOT_READY);
+      throw new TException(ERROR_MSG_META_NOT_READY);
+    }
+
     long result;
     try {
       result = metaGroupMember.removeNode(node);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 55b24da..122b7c5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.cluster.log.applier;
 
-import junit.framework.TestCase;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
@@ -74,6 +73,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import junit.framework.TestCase;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.junit.After;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
index 789f968..470bbc3 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/MetaSimpleSnapshotTest.java
@@ -62,7 +62,7 @@ public class MetaSimpleSnapshotTest extends IoTDBTest {
     metaGroupMember =
         new TestMetaGroupMember() {
           @Override
-          protected void startSubServers() {
+          protected void rebuildDataGroups() {
             subServerInitialized = true;
           }
         };
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
index 09a3354..451e78b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index 1af7d57..de4d588 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.query.control.QueryResourceManager;
 import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
index bf07b21..d3e6eca 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
@@ -37,6 +37,7 @@ import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
index 6c08e3d..31a846f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.cluster.query.reader.mult;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
@@ -40,6 +38,9 @@ import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
index a28a810..784e203 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.cluster.query.reader.mult;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.iotdb.cluster.ClusterIoTDB;
 import org.apache.iotdb.cluster.client.DataClientProvider;
 import org.apache.iotdb.cluster.client.async.AsyncDataClient;
@@ -41,6 +39,9 @@ import org.apache.iotdb.db.utils.SerializeUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.TimeValuePair;
 import org.apache.iotdb.tsfile.read.common.BatchData;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.thrift.TException;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
index 60fbe31..44c58fb 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.RegisterManager;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.SchemaUtils;
+
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.junit.After;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index c31d891..90884f2 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -108,6 +108,7 @@ import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.ValueFilter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol.Factory;
diff --git a/cluster/src/test/resources/node1conf/iotdb-engine.properties b/cluster/src/test/resources/node1conf/iotdb-engine.properties
index a386335..afd8911 100644
--- a/cluster/src/test/resources/node1conf/iotdb-engine.properties
+++ b/cluster/src/test/resources/node1conf/iotdb-engine.properties
@@ -16,12 +16,12 @@
 #under the License.
 
 
-base_dir=node1/tmp
-data_dirs=node1/data
-wal_dir=node1/wal
-index_root_dir=node1/index
-udf_root_dir=node1/ext
-tracing_dir=node1/data/tracing
+base_dir=target/node1/tmp
+data_dirs=target/node1/data
+wal_dir=target/node1/wal
+index_root_dir=target/node1/index
+udf_root_dir=target/node1/ext
+tracing_dir=target/node1/data/tracing
 
 rpc_port=6667
 metrics_port=8181
diff --git a/cluster/src/test/resources/node2conf/iotdb-engine.properties b/cluster/src/test/resources/node2conf/iotdb-engine.properties
index c9276c0..ec3b39b 100644
--- a/cluster/src/test/resources/node2conf/iotdb-engine.properties
+++ b/cluster/src/test/resources/node2conf/iotdb-engine.properties
@@ -16,12 +16,12 @@
 #under the License.
 
 
-base_dir=node2/tmp
-data_dirs=node2/data
-wal_dir=node2/wal
-index_root_dir=node2/index
-udf_root_dir=node2/ext
-tracing_dir=node2/data/tracing
+base_dir=target/node2/tmp
+data_dirs=target/node2/data
+wal_dir=target/node2/wal
+index_root_dir=target/node2/index
+udf_root_dir=target/node2/ext
+tracing_dir=target/node2/data/tracing
 
 rpc_port=6669
 metrics_port=8182
diff --git a/cluster/src/test/resources/node3conf/iotdb-engine.properties b/cluster/src/test/resources/node3conf/iotdb-engine.properties
index 04b1512..6eeb6af 100644
--- a/cluster/src/test/resources/node3conf/iotdb-engine.properties
+++ b/cluster/src/test/resources/node3conf/iotdb-engine.properties
@@ -16,12 +16,12 @@
 #under the License.
 
 
-base_dir=node3/tmp
-data_dirs=node3/data
-wal_dir=node3/wal
-index_root_dir=node3/index
-udf_root_dir=node3/ext
-tracing_dir=node3/data/tracing
+base_dir=target/node3/tmp
+data_dirs=target/node3/data
+wal_dir=target/node3/wal
+index_root_dir=target/node3/index
+udf_root_dir=target/node3/ext
+tracing_dir=target/node3/data/tracing
 
 rpc_port=6671
 metrics_port=8183