You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/24 04:23:34 UTC

[iotdb] branch cluster_node_management created (now 6d25ba5)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch cluster_node_management
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 6d25ba5  refactor node management

This branch includes the following new commits:

     new 6d25ba5  refactor node management

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: refactor node management

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_node_management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6d25ba5c2b642c4e5d3977eeb08b17f5646b8556
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 24 12:20:58 2020 +0800

    refactor node management
---
 .../cluster/client/async/AsyncClientPool.java      |  80 +++++---------
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  24 ++++-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |   4 +-
 .../cluster/log/applier/AsyncDataLogApplier.java   |   4 +-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |   2 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   2 +-
 .../cluster/query/manage/QueryCoordinator.java     | 115 ++-------------------
 .../iotdb/cluster/server/DataClusterServer.java    |   2 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |  10 ++
 .../handlers/caller/AppendNodeEntryHandler.java    |   6 +-
 .../server/handlers/caller/HeartbeatHandler.java   |   2 +-
 .../cluster/server/member/DataGroupMember.java     |  13 +--
 .../cluster/server/member/MetaGroupMember.java     |  48 +++++++--
 .../iotdb/cluster/server/member/RaftMember.java    |  78 +++++---------
 .../cluster/server/{ => monitor}/NodeReport.java   |   3 +-
 .../manage => server/monitor}/NodeStatus.java      |  42 +++++++-
 .../monitor/NodeStatusManager.java}                |  85 ++++++++-------
 .../iotdb/cluster/server/{ => monitor}/Peer.java   |   2 +-
 .../iotdb/cluster/server/{ => monitor}/Timer.java  |   2 +-
 .../cluster/server/service/MetaAsyncService.java   |   6 ++
 .../cluster/server/service/MetaSyncService.java    |   5 +
 .../cluster/utils/nodetool/ClusterMonitor.java     |   2 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   5 +-
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |   2 +-
 .../apache/iotdb/cluster/query/BaseQueryTest.java  |   5 +-
 .../cluster/query/manage/QueryCoordinatorTest.java |  10 +-
 .../caller/AppendNodeEntryHandlerTest.java         |   2 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   3 +-
 thrift/src/main/thrift/cluster.thrift              |   7 ++
 29 files changed, 262 insertions(+), 309 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 4127af7..be96b53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -24,13 +24,10 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 import org.apache.thrift.async.TAsyncMethodCall;
 import org.slf4j.Logger;
@@ -43,34 +40,22 @@ public class AsyncClientPool {
   private int maxConnectionForEachNode;
   private Map<ClusterNode, Deque<AsyncClient>> clientCaches = new ConcurrentHashMap<>();
   private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
-  private Map<ClusterNode, Integer> nodeErrorClientCountMap = new ConcurrentHashMap<>();
   private AsyncClientFactory asyncClientFactory;
-  private ScheduledExecutorService cleanErrorClientExecutorService;
-  // when set to true, if MAX_ERROR_COUNT errors occurs continuously when connecting to node, any
-  // further requests to the node will be rejected for PROBE_NODE_STATUS_PERIOD_SECOND
-  // heartbeats should not be blocked
-  private boolean blockOnError;
-
-  private static final int MAX_ERROR_COUNT = 3;
-  private static final int PROBE_NODE_STATUS_PERIOD_SECOND = 60;
 
   public AsyncClientPool(AsyncClientFactory asyncClientFactory) {
-    this(asyncClientFactory, true);
-  }
-
-  public AsyncClientPool(AsyncClientFactory asyncClientFactory, boolean blockOnError) {
     this.asyncClientFactory = asyncClientFactory;
     this.maxConnectionForEachNode =
         ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
-    this.blockOnError = blockOnError;
-    if (blockOnError) {
-      this.cleanErrorClientExecutorService = new ScheduledThreadPoolExecutor(1,
-          new BasicThreadFactory.Builder().namingPattern("clean-error-client-%d").daemon(true)
-              .build());
-      this.cleanErrorClientExecutorService
-          .scheduleAtFixedRate(this::cleanErrorClients, PROBE_NODE_STATUS_PERIOD_SECOND,
-              PROBE_NODE_STATUS_PERIOD_SECOND, TimeUnit.SECONDS);
-    }
+  }
+
+  /**
+   * See getClient(Node node, boolean activatedOnly)
+   * @param node
+   * @return
+   * @throws IOException
+   */
+  public AsyncClient getClient(Node node) throws IOException {
+    return getClient(node, true);
   }
 
   /**
@@ -79,20 +64,24 @@ public class AsyncClientPool {
    * IMPORTANT!!! The caller should check whether the return value is null or not!
    *
    * @param node the node want to connect
+   * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
+   *                      true, which avoid unnecessary wait for already down nodes, but
+   *                      heartbeat attempts should always try to connect so the node can be
+   *                      reactivated ASAP
    * @return if the node can connect, return the client, otherwise null
    * @throws IOException if the node can not be connected
    */
-  public AsyncClient getClient(Node node) throws IOException {
+  public AsyncClient getClient(Node node, boolean activatedOnly) throws IOException {
     ClusterNode clusterNode = new ClusterNode(node);
-    if (blockOnError && nodeErrorClientCountMap.getOrDefault(clusterNode, 0) > MAX_ERROR_COUNT) {
-      throw new IOException(String.format("connect node failed, maybe the node is down, %s", node));
+    if (activatedOnly && NodeStatusManager.getINSTANCE().isActivated(node)) {
+      return null;
     }
 
     AsyncClient client;
+    //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
+    Deque<AsyncClient> clientStack = clientCaches.computeIfAbsent(clusterNode,
+        n -> new ArrayDeque<>());
     synchronized (this) {
-      //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
-      Deque<AsyncClient> clientStack = clientCaches.computeIfAbsent(clusterNode,
-          n -> new ArrayDeque<>());
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -185,38 +174,15 @@ public class AsyncClientPool {
           ((AsyncMetaClient) client).close();
         }
       }
-      clientStack.clear();
       nodeClientNumMap.put(clusterNode, 0);
       this.notifyAll();
-    }
-    if (!blockOnError) {
-      return;
-    }
-    synchronized (this) {
-      if (nodeErrorClientCountMap.containsKey(clusterNode)) {
-        nodeErrorClientCountMap.put(clusterNode, nodeErrorClientCountMap.get(clusterNode) + 1);
-      } else {
-        nodeErrorClientCountMap.put(clusterNode, 1);
-      }
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("the node={}, connect error times={}", clusterNode,
-          nodeErrorClientCountMap.get(clusterNode));
+      NodeStatusManager.getINSTANCE().deactivate(node);
     }
   }
 
   @SuppressWarnings("squid:S1135")
   void onComplete(Node node) {
-    ClusterNode clusterNode = new ClusterNode(node);
-    // TODO: if the heartbeat client pool completes, also unblock another pool
-    nodeErrorClientCountMap.remove(clusterNode);
-  }
-
-  void cleanErrorClients() {
-    synchronized (this) {
-      nodeErrorClientCountMap.clear();
-      logger.debug("clean all error clients");
-    }
+    NodeStatusManager.getINSTANCE().activate(node);
   }
 
   void recreateClient(Node node) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index c9c3e78..d91adb0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -49,15 +50,31 @@ public class SyncClientPool {
   }
 
   /**
+   * See getClient(Node node, boolean activatedOnly)
+   * @param node
+   * @return
+   */
+  public Client getClient(Node node) {
+    return getClient(node, true);
+  }
+
+  /**
    * Get a client of the given node from the cache if one is available, or create a new one.
    * <p>
    * IMPORTANT!!! The caller should check whether the return value is null or not!
    *
-   * @param node the node want to connect
+   * @param node          the node want to connect
+   * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
+   *                      true, which avoid unnecessary wait for already down nodes, but heartbeat
+   *                      attempts should always try to connect so the node can be reactivated ASAP
    * @return if the node can connect, return the client, otherwise null
    */
-  public Client getClient(Node node) {
+  public Client getClient(Node node, boolean activatedOnly) {
     ClusterNode clusterNode = new ClusterNode(node);
+    if (activatedOnly && NodeStatusManager.getINSTANCE().isActivated(node)) {
+      return null;
+    }
+
     //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
     synchronized (this) {
@@ -111,12 +128,15 @@ public class SyncClientPool {
     synchronized (this) {
       if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
         clientStack.push(client);
+        NodeStatusManager.getINSTANCE().activate(node);
       } else {
         try {
           clientStack.push(syncClientFactory.getSyncClient(node, this));
+          NodeStatusManager.getINSTANCE().activate(node);
         } catch (TTransportException e) {
           logger.error("Cannot open transport for client {}", node, e);
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue - 1);
+          NodeStatusManager.getINSTANCE().deactivate(node);
         }
       }
       this.notifyAll();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 21820d5..a813fa4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.cluster.server.Peer;
-import org.apache.iotdb.cluster.server.Timer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index bfe57a1..e1f7c37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -34,8 +34,8 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 23687ad..8a266ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..21b9dbd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.StableEntryManager;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
index e592a87..91d193a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
@@ -19,24 +19,12 @@
 
 package org.apache.iotdb.cluster.query.manage;
 
-import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
-import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 
 /**
  * QueryCoordinator records the spec and load of each node, deciding the order of replicas that
@@ -44,17 +32,12 @@ import org.slf4j.LoggerFactory;
  */
 public class QueryCoordinator {
 
-  private static final Logger logger = LoggerFactory.getLogger(QueryCoordinator.class);
-
   // a status is considered stale if it is older than one minute and should be updated
-  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
   private static final QueryCoordinator INSTANCE = new QueryCoordinator();
+  private static final NodeStatusManager STATUS_MANAGER = NodeStatusManager.getINSTANCE();
 
   private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
 
-  private MetaGroupMember metaGroupMember;
-  private Map<Node, NodeStatus> nodeStatusMap = new ConcurrentHashMap<>();
-
 
   private QueryCoordinator() {
     // singleton class
@@ -64,13 +47,9 @@ public class QueryCoordinator {
     return INSTANCE;
   }
 
-  public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
-    this.metaGroupMember = metaGroupMember;
-  }
-
   /**
    * Reorder the given nodes based on their status, the nodes that are more suitable (have low delay
-   * or load) are placed first. This won't change the order of the origin list.
+   * or load) are placed first. This won't change the order of the original list.
    *
    * @param nodes
    * @return
@@ -81,89 +60,7 @@ public class QueryCoordinator {
     return reordered;
   }
 
-  private TNodeStatus getNodeStatusWithAsyncServer(Node node) {
-    TNodeStatus status = null;
-    AsyncMetaClient asyncMetaClient = (AsyncMetaClient) metaGroupMember.getAsyncClient(node);
-    if (asyncMetaClient == null) {
-      return null;
-    }
-    try {
-      status = SyncClientAdaptor.queryNodeStatus(asyncMetaClient);
-    } catch (TException e) {
-      if (e.getCause() instanceof ConnectException) {
-        logger.warn("Cannot query the node status of {}: {}", node, e.getCause());
-      } else {
-        logger.error("query node status failed {}", node, e);
-      }
-      return null;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      logger.error("Cannot query the node status of {}", node, e);
-      return null;
-    }
-    return status;
-  }
-
-  private TNodeStatus getNodeStatusWithSyncServer(Node node) {
-    TNodeStatus status = null;
-    SyncMetaClient syncMetaClient = (SyncMetaClient) metaGroupMember.getSyncClient(node);
-    if (syncMetaClient == null) {
-      logger.error("Cannot query the node status of {} for no available client", node);
-      return null;
-    }
-    try {
-      status = syncMetaClient.queryNodeStatus();
-    } catch (TException e) {
-      syncMetaClient.getInputProtocol().getTransport().close();
-      logger.error("Cannot query the node status of {}", node, e);
-      return null;
-    } finally {
-      ClientUtils.putBackSyncClient(syncMetaClient);
-    }
-    return status;
-  }
-
-  private NodeStatus getNodeStatus(Node node) {
-    // avoid duplicated computing of concurrent queries
-    NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
-    if (node.equals(metaGroupMember.getThisNode())) {
-      return nodeStatus;
-    }
-
-    long currTime = System.currentTimeMillis();
-    if (nodeStatus.getStatus() != null
-        && currTime - nodeStatus.getLastUpdateTime() <= NODE_STATUS_UPDATE_INTERVAL_MS) {
-      return nodeStatus;
-    }
-
-    long startTime = System.nanoTime();
-    TNodeStatus status = null;
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-      status = getNodeStatusWithAsyncServer(node);
-    } else {
-      status = getNodeStatusWithSyncServer(node);
-    }
-    long responseTime = System.nanoTime() - startTime;
-
-    if (status != null) {
-      nodeStatus.setStatus(status);
-      nodeStatus.setLastUpdateTime(System.currentTimeMillis());
-      nodeStatus.setLastResponseLatency(responseTime);
-    } else {
-      nodeStatus.setLastResponseLatency(Long.MAX_VALUE);
-    }
-    logger.info("NodeStatus of {} is updated, status: {}, response time: {}", node,
-        nodeStatus.getStatus(), nodeStatus.getLastResponseLatency());
-    return nodeStatus;
-  }
-
-  public long getLastResponseLatency(Node node) {
-    NodeStatus nodeStatus = getNodeStatus(node);
-    return nodeStatus.getLastResponseLatency();
-  }
-
-  @TestOnly
-  public void clear() {
-    nodeStatusMap.clear();
+  public NodeStatus getNodeStatus(Node node) {
+    return STATUS_MANAGER.getNodeStatus(node, true);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 391cf69..6e5bca6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -59,7 +59,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Processor;
-import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index a0fb04d..98ae174 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -343,4 +343,14 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
       AsyncMethodCallback<Void> resultHandler) {
     asyncService.removeHardLink(hardLinkPath, resultHandler);
   }
+
+  @Override
+  public void handshake(Node sender) {
+    syncService.handshake(sender);
+  }
+
+  @Override
+  public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+    asyncService.handshake(sender, resultHandler);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 8e6b6ce..f82f24e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.Peer;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 433c9d7..7a7bab0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -24,7 +24,7 @@ import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
 import java.net.ConnectException;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..4af23b5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -77,12 +77,13 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.PullSnapshotHintService;
 import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -160,7 +161,7 @@ public class DataGroupMember extends RaftMember {
     super("Data(" + nodes.getHeader().getIp() + ":" + nodes.getHeader().getMetaPort() + ")",
         new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncDataClient.FactorySync(factory)),
-        new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory), false),
+        new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncDataHeartbeatClient.FactorySync(factory)),
         new AsyncClientPool(new SingleManagerFactory(factory)));
     this.thisNode = thisNode;
@@ -768,7 +769,7 @@ public class DataGroupMember extends RaftMember {
     return new DataMemberReport(character, leader.get(), term.get(),
         logManager.getLastLogTerm(), lastReportedLogIndex, logManager.getCommitLogIndex(),
         logManager.getCommitLogTerm(), getHeader(), readOnly,
-        QueryCoordinator.getINSTANCE()
+        NodeStatusManager.getINSTANCE()
             .getLastResponseLatency(getHeader()), lastHeartbeatReceivedTime, prevLastLogIndex,
         logManager.getMaxHaveAppliedCommitIndex());
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7d2b776..d36816c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -88,7 +88,6 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.query.ClusterPlanRouter;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -105,17 +104,18 @@ import org.apache.iotdb.cluster.server.ClientServer;
 import org.apache.iotdb.cluster.server.DataClusterServer;
 import org.apache.iotdb.cluster.server.HardLinkCleaner;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.NodeReport;
-import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
 import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatThread;
 import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.MetaMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
@@ -271,7 +271,7 @@ public class MetaGroupMember extends RaftMember {
   public MetaGroupMember(TProtocolFactory factory, Node thisNode) throws QueryProcessException {
     super("Meta", new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
-        new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
+        new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
     allNodes = new ArrayList<>();
     initPeerMap();
@@ -335,7 +335,7 @@ public class MetaGroupMember extends RaftMember {
       return;
     }
     addSeedNodes();
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(this);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(this);
     super.start();
   }
 
@@ -783,6 +783,7 @@ public class MetaGroupMember extends RaftMember {
       try {
         getDataClusterServer().buildDataGroupMembers(partitionTable);
         initSubServers();
+        sendHandshake();
       } catch (TTransportException | StartupException e) {
         logger.error("Build partition table failed: ", e);
         stop();
@@ -793,6 +794,29 @@ public class MetaGroupMember extends RaftMember {
   }
 
   /**
+   * When the node restarts, it sends handshakes to all other nodes so they may know it is back.
+   */
+  private void sendHandshake() {
+    for (Node node : allNodes) {
+      try {
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          AsyncMetaClient asyncClient = (AsyncMetaClient) getAsyncClient(node);
+          if (asyncClient != null) {
+            asyncClient.handshake(thisNode, new GenericHandler<>(node, null));
+          }
+        } else {
+          SyncMetaClient syncClient = (SyncMetaClient) getSyncClient(node);
+          if (syncClient != null) {
+            syncClient.handshake(thisNode);
+          }
+        }
+      } catch (TException e) {
+        // ignore handshake exceptions
+      }
+    }
+  }
+
+  /**
    * Process the join cluster request of "node". Only proceed when the partition table is ready.
    *
    * @param node cannot be the local node
@@ -1022,7 +1046,7 @@ public class MetaGroupMember extends RaftMember {
 
   private CheckStatusResponse checkStatus(Node seedNode) {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-      AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode);
+      AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode, false);
       if (client == null) {
         return null;
       }
@@ -1035,7 +1059,7 @@ public class MetaGroupMember extends RaftMember {
         logger.warn("Current thread is interrupted.");
       }
     } else {
-      SyncMetaClient client = (SyncMetaClient) getSyncClient(seedNode);
+      SyncMetaClient client = (SyncMetaClient) getSyncClient(seedNode, false);
       if (client == null) {
         return null;
       }
@@ -1804,7 +1828,7 @@ public class MetaGroupMember extends RaftMember {
 
   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
       throws IOException {
-    Client client = null;
+    Client client;
     try {
       client = getClientProvider().getSyncDataClient(receiver,
           RaftServer.getWriteOperationTimeoutMS());
@@ -1903,7 +1927,7 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
-  private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) throws TException {
+  private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) {
     NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus);
     for (Node node : allNodes) {
       SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
@@ -2175,4 +2199,8 @@ public class MetaGroupMember extends RaftMember {
   public void setClientProvider(DataClientProvider dataClientProvider) {
     this.dataClientProvider = dataClientProvider;
   }
+
+  public void handleHandshake(Node sender) {
+    NodeStatusManager.getINSTANCE().activate(sender);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 14271dc..c2443d1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -73,11 +73,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -123,17 +123,6 @@ public abstract class RaftMember {
   private static long waitLeaderTimeMs = 60 * 1000L;
 
   /**
-   * when opening a client failed (connection refused), wait for a while to avoid sending useless
-   * connection requests too frequently
-   */
-  private static final long SYNC_CLIENT_TIMEOUT_MS = 1000;
-
-  /**
-   * the max retry times for get a available client
-   */
-  private static final int MAX_RETRY_TIMES_FOR_GET_CLIENT = 5;
-
-  /**
    * when the leader of this node changes, the condition will be notified so other threads that wait
    * on this may be woken.
    */
@@ -558,7 +547,7 @@ public abstract class RaftMember {
    * @return an asynchronous thrift client or null if the caller tries to connect the local node.
    */
   public AsyncClient getAsyncHeartbeatClient(Node node) {
-    return getAsyncClient(node, asyncHeartbeatClientPool);
+    return getAsyncClient(node, asyncHeartbeatClientPool, false);
   }
 
   /**
@@ -567,7 +556,7 @@ public abstract class RaftMember {
    * @return the heartbeat client for the node
    */
   public Client getSyncHeartbeatClient(Node node) {
-    return getSyncClient(syncHeartbeatClientPool, node);
+    return getSyncClient(syncHeartbeatClientPool, node, false);
   }
 
   public void sendLogAsync(Log log, AtomicInteger voteCounter, Node node,
@@ -586,28 +575,12 @@ public abstract class RaftMember {
     }
   }
 
-  private Client getSyncClient(SyncClientPool pool, Node node) {
+  private Client getSyncClient(SyncClientPool pool, Node node, boolean activatedOnly) {
     if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
       return null;
     }
 
-    Client client = null;
-    for (int i = 0; i < MAX_RETRY_TIMES_FOR_GET_CLIENT; i++) {
-      client = pool.getClient(node);
-      if (client == null) {
-        // this is typically because the target server is not yet ready (connection refused), so we
-        // wait for a while before reopening the transport to avoid sending requests too frequently
-        try {
-          Thread.sleep(SYNC_CLIENT_TIMEOUT_MS);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return null;
-        }
-      } else {
-        return client;
-      }
-    }
-    return client;
+    return pool.getClient(node, activatedOnly);
   }
 
   public NodeCharacter getCharacter() {
@@ -1262,35 +1235,28 @@ public abstract class RaftMember {
    * the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
-    return getAsyncClient(node, asyncClientPool);
+    return getAsyncClient(node, asyncClientPool, true);
+  }
+
+  public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
+    return getAsyncClient(node, asyncClientPool, activatedOnly);
   }
 
   public AsyncClient getSendLogAsyncClient(Node node) {
-    return getAsyncClient(node, asyncSendLogClientPool);
+    return getAsyncClient(node, asyncSendLogClientPool, true);
   }
 
-  private AsyncClient getAsyncClient(Node node, AsyncClientPool pool) {
+  private AsyncClient getAsyncClient(Node node, AsyncClientPool pool, boolean activatedOnly) {
     if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
       return null;
     }
 
-    AsyncClient client = null;
-    for (int i = 0; i < MAX_RETRY_TIMES_FOR_GET_CLIENT; i++) {
-      try {
-        client = pool.getClient(node);
-        if (!ClientUtils.isClientReady(client)) {
-          Thread.sleep(SYNC_CLIENT_TIMEOUT_MS);
-        } else {
-          return client;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return null;
-      } catch (IOException e) {
-        logger.warn("{} cannot connect to node {}", name, node, e);
-      }
+    try {
+      return pool.getClient(node, activatedOnly);
+    } catch (IOException e) {
+      logger.warn("{} cannot connect to node {}", name, node, e);
+      return null;
     }
-    return client;
   }
 
   /**
@@ -1301,7 +1267,11 @@ public abstract class RaftMember {
    * @return the client if node is available, otherwise null
    */
   public Client getSyncClient(Node node) {
-    return getSyncClient(syncClientPool, node);
+    return getSyncClient(syncClientPool, node, true);
+  }
+
+  public Client getSyncClient(Node node, boolean activatedOnly) {
+    return getSyncClient(syncClientPool, node, activatedOnly);
   }
 
   public AtomicLong getTerm() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index e175248..9cbd35b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.rpc.RpcStat;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
similarity index 58%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 6f748ec..4ec8857 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.manage;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
@@ -28,6 +28,12 @@ import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 @SuppressWarnings("java:S1135")
 public class NodeStatus implements Comparable<NodeStatus> {
 
+  // if a node is deactivated lastly too long ago, it is also assumed activated because it may
+  // have restarted but there is no heartbeat between the two nodes, and the local node cannot
+  // know the fact that it is normal again. Notice that we cannot always rely on the start-up
+  // hello, because it only occurs once and may be lost.
+  private static final long DEACTIVATION_VALID_INTERVAL_MS = 600_000L;
+
   private TNodeStatus status;
   // when is the status last updated, millisecond timestamp, to judge whether we should update
   // the status or not
@@ -36,6 +42,19 @@ public class NodeStatus implements Comparable<NodeStatus> {
   // reflect the node's load or network condition
   private long lastResponseLatency;
 
+  // if a node is judged down by heartbeats or other attempts to connect, isActivated will be set
+  // to false, so further attempts to get clients of this node will fail without a timeout, but
+  // getting clients for heartbeat will not fail so the node can be activated as soon as it is up
+  // again. Clients of associated nodes should take the responsibility to activate or deactivate
+  // the node.
+  private volatile boolean isActivated = true;
+
+  // if there is no heartbeat between the local node and this node, when this node is marked
+  // deactivated, it cannot be reactivated in a normal way. So we also consider it reactivated if
+  // its lastDeactivatedTime is too old.
+  // TODO-Cluster: say hello to other nodes when a node is back online
+  private long lastDeactivatedTime;
+
   //TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
   @Override
   public int compareTo(NodeStatus o) {
@@ -61,11 +80,11 @@ public class NodeStatus implements Comparable<NodeStatus> {
     return Objects.hash(status, lastUpdateTime, lastResponseLatency);
   }
 
-  long getLastUpdateTime() {
+  public long getLastUpdateTime() {
     return lastUpdateTime;
   }
 
-  long getLastResponseLatency() {
+  public long getLastResponseLatency() {
     return lastResponseLatency;
   }
 
@@ -77,11 +96,24 @@ public class NodeStatus implements Comparable<NodeStatus> {
     this.status = status;
   }
 
-  void setLastUpdateTime(long lastUpdateTime) {
+  public void setLastUpdateTime(long lastUpdateTime) {
     this.lastUpdateTime = lastUpdateTime;
   }
 
-  void setLastResponseLatency(long lastResponseLatency) {
+  public void setLastResponseLatency(long lastResponseLatency) {
     this.lastResponseLatency = lastResponseLatency;
   }
+
+  public void activate() {
+    isActivated = true;
+  }
+
+  public void deactivate() {
+    isActivated = false;
+    lastDeactivatedTime = System.currentTimeMillis();
+  }
+
+  public boolean isActivated() {
+    return isActivated || (System.currentTimeMillis() - lastDeactivatedTime) > DEACTIVATION_VALID_INTERVAL_MS;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
similarity index 74%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index e592a87..dceac90 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -17,12 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.manage;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -39,28 +36,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * QueryCoordinator records the spec and load of each node, deciding the order of replicas that
- * should be queried
+ * NodeStatusManager manages the status (network latency, workload, connectivity) of each node in
+ * the whole cluster. The status is updated on demand, so it may not be up-to-date if not forced to
+ * update.
  */
-public class QueryCoordinator {
-
-  private static final Logger logger = LoggerFactory.getLogger(QueryCoordinator.class);
+public class NodeStatusManager {
 
+  private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
   // a status is considered stale if it is older than one minute and should be updated
   private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
-  private static final QueryCoordinator INSTANCE = new QueryCoordinator();
-
-  private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
+  private static final NodeStatusManager INSTANCE = new NodeStatusManager();
 
   private MetaGroupMember metaGroupMember;
   private Map<Node, NodeStatus> nodeStatusMap = new ConcurrentHashMap<>();
 
-
-  private QueryCoordinator() {
-    // singleton class
-  }
-
-  public static QueryCoordinator getINSTANCE() {
+  public static NodeStatusManager getINSTANCE() {
     return INSTANCE;
   }
 
@@ -68,21 +58,8 @@ public class QueryCoordinator {
     this.metaGroupMember = metaGroupMember;
   }
 
-  /**
-   * Reorder the given nodes based on their status, the nodes that are more suitable (have low delay
-   * or load) are placed first. This won't change the order of the origin list.
-   *
-   * @param nodes
-   * @return
-   */
-  public List<Node> reorderNodes(List<Node> nodes) {
-    List<Node> reordered = new ArrayList<>(nodes);
-    reordered.sort(nodeComparator);
-    return reordered;
-  }
-
   private TNodeStatus getNodeStatusWithAsyncServer(Node node) {
-    TNodeStatus status = null;
+    TNodeStatus status;
     AsyncMetaClient asyncMetaClient = (AsyncMetaClient) metaGroupMember.getAsyncClient(node);
     if (asyncMetaClient == null) {
       return null;
@@ -105,7 +82,7 @@ public class QueryCoordinator {
   }
 
   private TNodeStatus getNodeStatusWithSyncServer(Node node) {
-    TNodeStatus status = null;
+    TNodeStatus status;
     SyncMetaClient syncMetaClient = (SyncMetaClient) metaGroupMember.getSyncClient(node);
     if (syncMetaClient == null) {
       logger.error("Cannot query the node status of {} for no available client", node);
@@ -123,21 +100,37 @@ public class QueryCoordinator {
     return status;
   }
 
-  private NodeStatus getNodeStatus(Node node) {
+  /**
+   * Get the status of the given node. If tryUpdate == true and the current status is older than
+   * NODE_STATUS_UPDATE_INTERVAL_MS, it will be updated.
+   *
+   * @param node
+   * @param tryUpdate when set to true, the manager will try to update the status of the node if it
+   *                  is old enough, otherwise, it will just return the last recorded status.
+   * @return
+   */
+  public NodeStatus getNodeStatus(Node node, boolean tryUpdate) {
     // avoid duplicated computing of concurrent queries
     NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
     if (node.equals(metaGroupMember.getThisNode())) {
       return nodeStatus;
     }
 
+    if (tryUpdate) {
+      tryUpdateNodeStatus(node, nodeStatus);
+    }
+    return nodeStatus;
+  }
+
+  private void tryUpdateNodeStatus(Node node, NodeStatus nodeStatus) {
     long currTime = System.currentTimeMillis();
     if (nodeStatus.getStatus() != null
         && currTime - nodeStatus.getLastUpdateTime() <= NODE_STATUS_UPDATE_INTERVAL_MS) {
-      return nodeStatus;
+      return;
     }
 
     long startTime = System.nanoTime();
-    TNodeStatus status = null;
+    TNodeStatus status;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       status = getNodeStatusWithAsyncServer(node);
     } else {
@@ -154,11 +147,10 @@ public class QueryCoordinator {
     }
     logger.info("NodeStatus of {} is updated, status: {}, response time: {}", node,
         nodeStatus.getStatus(), nodeStatus.getLastResponseLatency());
-    return nodeStatus;
   }
 
   public long getLastResponseLatency(Node node) {
-    NodeStatus nodeStatus = getNodeStatus(node);
+    NodeStatus nodeStatus = getNodeStatus(node, true);
     return nodeStatus.getLastResponseLatency();
   }
 
@@ -166,4 +158,21 @@ public class QueryCoordinator {
   public void clear() {
     nodeStatusMap.clear();
   }
+
+  public void activate(Node node) {
+    getNodeStatus(node, false).activate();
+  }
+
+  public void deactivate(Node node) {
+    getNodeStatus(node, false).deactivate();
+  }
+
+  /**
+   * @param node
+   * @return whether the node is CURRENTLY available, this method will not try to update its status
+   * to avoid deadlock
+   */
+  public boolean isActivated(Node node) {
+    return getNodeStatus(node, false).isActivated();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
similarity index 97%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
index af017d4..d012d60 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
similarity index 99%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 4d65241..7b58051 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -18,7 +18,7 @@
  */
 
 
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4ca6eb0..114aa5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -199,4 +199,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
     metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
     resultHandler.onComplete(null);
   }
+
+  @Override
+  public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+    metaGroupMember.handleHandshake(sender);
+    resultHandler.onComplete(null);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 3b5f445..ec1519a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -191,4 +191,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
   public void exile() {
     metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
   }
+
+  @Override
+  public void handshake(Node sender) {
+    metaGroupMember.handleHandshake(sender);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 2892b55..28922ad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
-import org.apache.iotdb.cluster.server.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StartupException;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index df3cabc..6bf561e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -138,7 +139,7 @@ public class DataLogApplierTest extends IoTDBTest {
     testDataGroupMember.setLeader(testDataGroupMember.getThisNode());
     testDataGroupMember.setCharacter(NodeCharacter.LEADER);
     testMetaGroupMember.setCharacter(NodeCharacter.LEADER);
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
     partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
     testMetaGroupMember.setClientProvider(new DataClientProvider(new Factory()) {
@@ -206,7 +207,7 @@ public class DataLogApplierTest extends IoTDBTest {
     testMetaGroupMember.stop();
     testMetaGroupMember.closeLogManager();
     super.tearDown();
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(null);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(partialWriteEnabled);
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index c431064..af9b312 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -45,7 +45,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.db.service.IoTDB;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
index b2d9b55..70bef80 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.server.member.MemberTest;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -74,7 +75,7 @@ public class BaseQueryTest extends MemberTest {
       pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
       dataTypes.add(TSDataType.DOUBLE);
     }
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
     TestUtils.prepareData();
   }
 
@@ -82,7 +83,7 @@ public class BaseQueryTest extends MemberTest {
   @After
   public void tearDown() throws Exception {
     super.tearDown();
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(null);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
   }
 
   void checkSequentialDataset(QueryDataSet dataSet, int offset, int size) throws IOException {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
index e7f8303..622fff0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.junit.After;
@@ -89,8 +91,8 @@ public class QueryCoordinatorTest {
         }
       }
     };
-    coordinator.setMetaGroupMember(metaGroupMember);
-    coordinator.clear();
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(metaGroupMember);
+    NodeStatusManager.getINSTANCE().clear();
   }
 
   @After
@@ -108,10 +110,6 @@ public class QueryCoordinatorTest {
     Collections.shuffle(unorderedNodes);
 
     List<Node> reorderedNodes = coordinator.reorderNodes(unorderedNodes);
-    for (Node orderedNode : orderedNodes) {
-      long latency = coordinator.getLastResponseLatency(orderedNode);
-      System.out.printf("%s -> %d%n", orderedNode, latency);
-    }
     assertEquals(orderedNodes, reorderedNodes);
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 7ea8b99..899b0f4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.junit.After;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 32b0b30..c54488b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -176,7 +177,7 @@ public class MetaGroupMemberTest extends MemberTest {
     buildDataGroups(dataClusterServer);
     testMetaMember.getThisNode().setNodeIdentifier(0);
     mockDataClusterServer = false;
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
     exiledNode = null;
     System.out.println("Init term of metaGroupMember: " + testMetaMember.getTerm().get());
   }
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 6c86a1f..1008e85 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -455,4 +455,11 @@ service TSMetaService extends RaftService {
 
   Node checkAlive()
 
+  /**
+  * When a node starts, it send handshakes to all other nodes so they know the node is alive
+  * again. Notice that heartbeats exists only between leaders and followers, so coordinators
+  * cannot know when another node resumes, and handshakes are mainly used to update node status
+  * on coordinator side.
+  **/
+  void handshake(Node sender);
 }