You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/24 04:23:35 UTC

[iotdb] 01/01: refactor node management

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_node_management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6d25ba5c2b642c4e5d3977eeb08b17f5646b8556
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 24 12:20:58 2020 +0800

    refactor node management
---
 .../cluster/client/async/AsyncClientPool.java      |  80 +++++---------
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  24 ++++-
 .../apache/iotdb/cluster/log/LogDispatcher.java    |   4 +-
 .../cluster/log/applier/AsyncDataLogApplier.java   |   4 +-
 .../iotdb/cluster/log/catchup/CatchUpTask.java     |   2 +-
 .../iotdb/cluster/log/manage/RaftLogManager.java   |   2 +-
 .../cluster/query/manage/QueryCoordinator.java     | 115 ++-------------------
 .../iotdb/cluster/server/DataClusterServer.java    |   2 +-
 .../iotdb/cluster/server/MetaClusterServer.java    |  10 ++
 .../handlers/caller/AppendNodeEntryHandler.java    |   6 +-
 .../server/handlers/caller/HeartbeatHandler.java   |   2 +-
 .../cluster/server/member/DataGroupMember.java     |  13 +--
 .../cluster/server/member/MetaGroupMember.java     |  48 +++++++--
 .../iotdb/cluster/server/member/RaftMember.java    |  78 +++++---------
 .../cluster/server/{ => monitor}/NodeReport.java   |   3 +-
 .../manage => server/monitor}/NodeStatus.java      |  42 +++++++-
 .../monitor/NodeStatusManager.java}                |  85 ++++++++-------
 .../iotdb/cluster/server/{ => monitor}/Peer.java   |   2 +-
 .../iotdb/cluster/server/{ => monitor}/Timer.java  |   2 +-
 .../cluster/server/service/MetaAsyncService.java   |   6 ++
 .../cluster/server/service/MetaSyncService.java    |   5 +
 .../cluster/utils/nodetool/ClusterMonitor.java     |   2 +-
 .../cluster/log/applier/DataLogApplierTest.java    |   5 +-
 .../iotdb/cluster/log/catchup/CatchUpTaskTest.java |   2 +-
 .../apache/iotdb/cluster/query/BaseQueryTest.java  |   5 +-
 .../cluster/query/manage/QueryCoordinatorTest.java |  10 +-
 .../caller/AppendNodeEntryHandlerTest.java         |   2 +-
 .../cluster/server/member/MetaGroupMemberTest.java |   3 +-
 thrift/src/main/thrift/cluster.thrift              |   7 ++
 29 files changed, 262 insertions(+), 309 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 4127af7..be96b53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -24,13 +24,10 @@ import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 import org.apache.thrift.async.TAsyncMethodCall;
 import org.slf4j.Logger;
@@ -43,34 +40,22 @@ public class AsyncClientPool {
   private int maxConnectionForEachNode;
   private Map<ClusterNode, Deque<AsyncClient>> clientCaches = new ConcurrentHashMap<>();
   private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
-  private Map<ClusterNode, Integer> nodeErrorClientCountMap = new ConcurrentHashMap<>();
   private AsyncClientFactory asyncClientFactory;
-  private ScheduledExecutorService cleanErrorClientExecutorService;
-  // when set to true, if MAX_ERROR_COUNT errors occurs continuously when connecting to node, any
-  // further requests to the node will be rejected for PROBE_NODE_STATUS_PERIOD_SECOND
-  // heartbeats should not be blocked
-  private boolean blockOnError;
-
-  private static final int MAX_ERROR_COUNT = 3;
-  private static final int PROBE_NODE_STATUS_PERIOD_SECOND = 60;
 
   public AsyncClientPool(AsyncClientFactory asyncClientFactory) {
-    this(asyncClientFactory, true);
-  }
-
-  public AsyncClientPool(AsyncClientFactory asyncClientFactory, boolean blockOnError) {
     this.asyncClientFactory = asyncClientFactory;
     this.maxConnectionForEachNode =
         ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
-    this.blockOnError = blockOnError;
-    if (blockOnError) {
-      this.cleanErrorClientExecutorService = new ScheduledThreadPoolExecutor(1,
-          new BasicThreadFactory.Builder().namingPattern("clean-error-client-%d").daemon(true)
-              .build());
-      this.cleanErrorClientExecutorService
-          .scheduleAtFixedRate(this::cleanErrorClients, PROBE_NODE_STATUS_PERIOD_SECOND,
-              PROBE_NODE_STATUS_PERIOD_SECOND, TimeUnit.SECONDS);
-    }
+  }
+
+  /**
+   * See getClient(Node node, boolean activatedOnly)
+   * @param node
+   * @return
+   * @throws IOException
+   */
+  public AsyncClient getClient(Node node) throws IOException {
+    return getClient(node, true);
   }
 
   /**
@@ -79,20 +64,24 @@ public class AsyncClientPool {
    * IMPORTANT!!! The caller should check whether the return value is null or not!
    *
    * @param node the node want to connect
+   * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
+   *                      true, which avoid unnecessary wait for already down nodes, but
+   *                      heartbeat attempts should always try to connect so the node can be
+   *                      reactivated ASAP
    * @return if the node can connect, return the client, otherwise null
    * @throws IOException if the node can not be connected
    */
-  public AsyncClient getClient(Node node) throws IOException {
+  public AsyncClient getClient(Node node, boolean activatedOnly) throws IOException {
     ClusterNode clusterNode = new ClusterNode(node);
-    if (blockOnError && nodeErrorClientCountMap.getOrDefault(clusterNode, 0) > MAX_ERROR_COUNT) {
-      throw new IOException(String.format("connect node failed, maybe the node is down, %s", node));
+    if (activatedOnly && NodeStatusManager.getINSTANCE().isActivated(node)) {
+      return null;
     }
 
     AsyncClient client;
+    //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
+    Deque<AsyncClient> clientStack = clientCaches.computeIfAbsent(clusterNode,
+        n -> new ArrayDeque<>());
     synchronized (this) {
-      //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
-      Deque<AsyncClient> clientStack = clientCaches.computeIfAbsent(clusterNode,
-          n -> new ArrayDeque<>());
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -185,38 +174,15 @@ public class AsyncClientPool {
           ((AsyncMetaClient) client).close();
         }
       }
-      clientStack.clear();
       nodeClientNumMap.put(clusterNode, 0);
       this.notifyAll();
-    }
-    if (!blockOnError) {
-      return;
-    }
-    synchronized (this) {
-      if (nodeErrorClientCountMap.containsKey(clusterNode)) {
-        nodeErrorClientCountMap.put(clusterNode, nodeErrorClientCountMap.get(clusterNode) + 1);
-      } else {
-        nodeErrorClientCountMap.put(clusterNode, 1);
-      }
-    }
-    if (logger.isDebugEnabled()) {
-      logger.debug("the node={}, connect error times={}", clusterNode,
-          nodeErrorClientCountMap.get(clusterNode));
+      NodeStatusManager.getINSTANCE().deactivate(node);
     }
   }
 
   @SuppressWarnings("squid:S1135")
   void onComplete(Node node) {
-    ClusterNode clusterNode = new ClusterNode(node);
-    // TODO: if the heartbeat client pool completes, also unblock another pool
-    nodeErrorClientCountMap.remove(clusterNode);
-  }
-
-  void cleanErrorClients() {
-    synchronized (this) {
-      nodeErrorClientCountMap.clear();
-      logger.debug("clean all error clients");
-    }
+    NodeStatusManager.getINSTANCE().activate(node);
   }
 
   void recreateClient(Node node) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index c9c3e78..d91adb0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.utils.ClusterNode;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -49,15 +50,31 @@ public class SyncClientPool {
   }
 
   /**
+   * See getClient(Node node, boolean activatedOnly)
+   * @param node
+   * @return
+   */
+  public Client getClient(Node node) {
+    return getClient(node, true);
+  }
+
+  /**
    * Get a client of the given node from the cache if one is available, or create a new one.
    * <p>
    * IMPORTANT!!! The caller should check whether the return value is null or not!
    *
-   * @param node the node want to connect
+   * @param node          the node want to connect
+   * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
+   *                      true, which avoid unnecessary wait for already down nodes, but heartbeat
+   *                      attempts should always try to connect so the node can be reactivated ASAP
    * @return if the node can connect, return the client, otherwise null
    */
-  public Client getClient(Node node) {
+  public Client getClient(Node node, boolean activatedOnly) {
     ClusterNode clusterNode = new ClusterNode(node);
+    if (activatedOnly && NodeStatusManager.getINSTANCE().isActivated(node)) {
+      return null;
+    }
+
     //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
     synchronized (this) {
@@ -111,12 +128,15 @@ public class SyncClientPool {
     synchronized (this) {
       if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
         clientStack.push(client);
+        NodeStatusManager.getINSTANCE().activate(node);
       } else {
         try {
           clientStack.push(syncClientFactory.getSyncClient(node, this));
+          NodeStatusManager.getINSTANCE().activate(node);
         } catch (TTransportException e) {
           logger.error("Cannot open transport for client {}", node, e);
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue - 1);
+          NodeStatusManager.getINSTANCE().deactivate(node);
         }
       }
       this.notifyAll();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 21820d5..a813fa4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.cluster.server.Peer;
-import org.apache.iotdb.cluster.server.Timer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index bfe57a1..e1f7c37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -34,8 +34,8 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
 import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.metadata.PartialPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 23687ad..8a266ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.db.utils.TestOnly;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..21b9dbd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.log.LogApplier;
 import org.apache.iotdb.cluster.log.Snapshot;
 import org.apache.iotdb.cluster.log.StableEntryManager;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
index e592a87..91d193a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
@@ -19,24 +19,12 @@
 
 package org.apache.iotdb.cluster.query.manage;
 
-import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
-import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 
 /**
  * QueryCoordinator records the spec and load of each node, deciding the order of replicas that
@@ -44,17 +32,12 @@ import org.slf4j.LoggerFactory;
  */
 public class QueryCoordinator {
 
-  private static final Logger logger = LoggerFactory.getLogger(QueryCoordinator.class);
-
   // a status is considered stale if it is older than one minute and should be updated
-  private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
   private static final QueryCoordinator INSTANCE = new QueryCoordinator();
+  private static final NodeStatusManager STATUS_MANAGER = NodeStatusManager.getINSTANCE();
 
   private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
 
-  private MetaGroupMember metaGroupMember;
-  private Map<Node, NodeStatus> nodeStatusMap = new ConcurrentHashMap<>();
-
 
   private QueryCoordinator() {
     // singleton class
@@ -64,13 +47,9 @@ public class QueryCoordinator {
     return INSTANCE;
   }
 
-  public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
-    this.metaGroupMember = metaGroupMember;
-  }
-
   /**
    * Reorder the given nodes based on their status, the nodes that are more suitable (have low delay
-   * or load) are placed first. This won't change the order of the origin list.
+   * or load) are placed first. This won't change the order of the original list.
    *
    * @param nodes
    * @return
@@ -81,89 +60,7 @@ public class QueryCoordinator {
     return reordered;
   }
 
-  private TNodeStatus getNodeStatusWithAsyncServer(Node node) {
-    TNodeStatus status = null;
-    AsyncMetaClient asyncMetaClient = (AsyncMetaClient) metaGroupMember.getAsyncClient(node);
-    if (asyncMetaClient == null) {
-      return null;
-    }
-    try {
-      status = SyncClientAdaptor.queryNodeStatus(asyncMetaClient);
-    } catch (TException e) {
-      if (e.getCause() instanceof ConnectException) {
-        logger.warn("Cannot query the node status of {}: {}", node, e.getCause());
-      } else {
-        logger.error("query node status failed {}", node, e);
-      }
-      return null;
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      logger.error("Cannot query the node status of {}", node, e);
-      return null;
-    }
-    return status;
-  }
-
-  private TNodeStatus getNodeStatusWithSyncServer(Node node) {
-    TNodeStatus status = null;
-    SyncMetaClient syncMetaClient = (SyncMetaClient) metaGroupMember.getSyncClient(node);
-    if (syncMetaClient == null) {
-      logger.error("Cannot query the node status of {} for no available client", node);
-      return null;
-    }
-    try {
-      status = syncMetaClient.queryNodeStatus();
-    } catch (TException e) {
-      syncMetaClient.getInputProtocol().getTransport().close();
-      logger.error("Cannot query the node status of {}", node, e);
-      return null;
-    } finally {
-      ClientUtils.putBackSyncClient(syncMetaClient);
-    }
-    return status;
-  }
-
-  private NodeStatus getNodeStatus(Node node) {
-    // avoid duplicated computing of concurrent queries
-    NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
-    if (node.equals(metaGroupMember.getThisNode())) {
-      return nodeStatus;
-    }
-
-    long currTime = System.currentTimeMillis();
-    if (nodeStatus.getStatus() != null
-        && currTime - nodeStatus.getLastUpdateTime() <= NODE_STATUS_UPDATE_INTERVAL_MS) {
-      return nodeStatus;
-    }
-
-    long startTime = System.nanoTime();
-    TNodeStatus status = null;
-    if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-      status = getNodeStatusWithAsyncServer(node);
-    } else {
-      status = getNodeStatusWithSyncServer(node);
-    }
-    long responseTime = System.nanoTime() - startTime;
-
-    if (status != null) {
-      nodeStatus.setStatus(status);
-      nodeStatus.setLastUpdateTime(System.currentTimeMillis());
-      nodeStatus.setLastResponseLatency(responseTime);
-    } else {
-      nodeStatus.setLastResponseLatency(Long.MAX_VALUE);
-    }
-    logger.info("NodeStatus of {} is updated, status: {}, response time: {}", node,
-        nodeStatus.getStatus(), nodeStatus.getLastResponseLatency());
-    return nodeStatus;
-  }
-
-  public long getLastResponseLatency(Node node) {
-    NodeStatus nodeStatus = getNodeStatus(node);
-    return nodeStatus.getLastResponseLatency();
-  }
-
-  @TestOnly
-  public void clear() {
-    nodeStatusMap.clear();
+  public NodeStatus getNodeStatus(Node node) {
+    return STATUS_MANAGER.getNodeStatus(node, true);
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 391cf69..6e5bca6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -59,7 +59,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Processor;
-import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index a0fb04d..98ae174 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -343,4 +343,14 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
       AsyncMethodCallback<Void> resultHandler) {
     asyncService.removeHardLink(hardLinkPath, resultHandler);
   }
+
+  @Override
+  public void handshake(Node sender) {
+    syncService.handshake(sender);
+  }
+
+  @Override
+  public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+    asyncService.handshake(sender, resultHandler);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 8e6b6ce..f82f24e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.Peer;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 433c9d7..7a7bab0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -24,7 +24,7 @@ import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
 import java.net.ConnectException;
 import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..4af23b5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -77,12 +77,13 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
 import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.PullSnapshotHintService;
 import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
 import org.apache.iotdb.cluster.utils.StatusUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -160,7 +161,7 @@ public class DataGroupMember extends RaftMember {
     super("Data(" + nodes.getHeader().getIp() + ":" + nodes.getHeader().getMetaPort() + ")",
         new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncDataClient.FactorySync(factory)),
-        new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory), false),
+        new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncDataHeartbeatClient.FactorySync(factory)),
         new AsyncClientPool(new SingleManagerFactory(factory)));
     this.thisNode = thisNode;
@@ -768,7 +769,7 @@ public class DataGroupMember extends RaftMember {
     return new DataMemberReport(character, leader.get(), term.get(),
         logManager.getLastLogTerm(), lastReportedLogIndex, logManager.getCommitLogIndex(),
         logManager.getCommitLogTerm(), getHeader(), readOnly,
-        QueryCoordinator.getINSTANCE()
+        NodeStatusManager.getINSTANCE()
             .getLastResponseLatency(getHeader()), lastHeartbeatReceivedTime, prevLastLogIndex,
         logManager.getMaxHaveAppliedCommitIndex());
   }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7d2b776..d36816c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -88,7 +88,6 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
 import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.query.ClusterPlanRouter;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
 import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
 import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -105,17 +104,18 @@ import org.apache.iotdb.cluster.server.ClientServer;
 import org.apache.iotdb.cluster.server.DataClusterServer;
 import org.apache.iotdb.cluster.server.HardLinkCleaner;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.NodeReport;
-import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
 import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatThread;
 import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.MetaMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.utils.ClientUtils;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.PartitionUtils;
@@ -271,7 +271,7 @@ public class MetaGroupMember extends RaftMember {
   public MetaGroupMember(TProtocolFactory factory, Node thisNode) throws QueryProcessException {
     super("Meta", new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
-        new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
+        new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
         new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
     allNodes = new ArrayList<>();
     initPeerMap();
@@ -335,7 +335,7 @@ public class MetaGroupMember extends RaftMember {
       return;
     }
     addSeedNodes();
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(this);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(this);
     super.start();
   }
 
@@ -783,6 +783,7 @@ public class MetaGroupMember extends RaftMember {
       try {
         getDataClusterServer().buildDataGroupMembers(partitionTable);
         initSubServers();
+        sendHandshake();
       } catch (TTransportException | StartupException e) {
         logger.error("Build partition table failed: ", e);
         stop();
@@ -793,6 +794,29 @@ public class MetaGroupMember extends RaftMember {
   }
 
   /**
+   * When the node restarts, it sends handshakes to all other nodes so they may know it is back.
+   */
+  private void sendHandshake() {
+    for (Node node : allNodes) {
+      try {
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          AsyncMetaClient asyncClient = (AsyncMetaClient) getAsyncClient(node);
+          if (asyncClient != null) {
+            asyncClient.handshake(thisNode, new GenericHandler<>(node, null));
+          }
+        } else {
+          SyncMetaClient syncClient = (SyncMetaClient) getSyncClient(node);
+          if (syncClient != null) {
+            syncClient.handshake(thisNode);
+          }
+        }
+      } catch (TException e) {
+        // ignore handshake exceptions
+      }
+    }
+  }
+
+  /**
    * Process the join cluster request of "node". Only proceed when the partition table is ready.
    *
    * @param node cannot be the local node
@@ -1022,7 +1046,7 @@ public class MetaGroupMember extends RaftMember {
 
   private CheckStatusResponse checkStatus(Node seedNode) {
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
-      AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode);
+      AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode, false);
       if (client == null) {
         return null;
       }
@@ -1035,7 +1059,7 @@ public class MetaGroupMember extends RaftMember {
         logger.warn("Current thread is interrupted.");
       }
     } else {
-      SyncMetaClient client = (SyncMetaClient) getSyncClient(seedNode);
+      SyncMetaClient client = (SyncMetaClient) getSyncClient(seedNode, false);
       if (client == null) {
         return null;
       }
@@ -1804,7 +1828,7 @@ public class MetaGroupMember extends RaftMember {
 
   private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
       throws IOException {
-    Client client = null;
+    Client client;
     try {
       client = getClientProvider().getSyncDataClient(receiver,
           RaftServer.getWriteOperationTimeoutMS());
@@ -1903,7 +1927,7 @@ public class MetaGroupMember extends RaftMember {
     }
   }
 
-  private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) throws TException {
+  private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) {
     NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus);
     for (Node node : allNodes) {
       SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
@@ -2175,4 +2199,8 @@ public class MetaGroupMember extends RaftMember {
   public void setClientProvider(DataClientProvider dataClientProvider) {
     this.dataClientProvider = dataClientProvider;
   }
+
+  public void handleHandshake(Node sender) {
+    NodeStatusManager.getINSTANCE().activate(sender);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 14271dc..c2443d1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -73,11 +73,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
 import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -123,17 +123,6 @@ public abstract class RaftMember {
   private static long waitLeaderTimeMs = 60 * 1000L;
 
   /**
-   * when opening a client failed (connection refused), wait for a while to avoid sending useless
-   * connection requests too frequently
-   */
-  private static final long SYNC_CLIENT_TIMEOUT_MS = 1000;
-
-  /**
-   * the max retry times for get a available client
-   */
-  private static final int MAX_RETRY_TIMES_FOR_GET_CLIENT = 5;
-
-  /**
    * when the leader of this node changes, the condition will be notified so other threads that wait
    * on this may be woken.
    */
@@ -558,7 +547,7 @@ public abstract class RaftMember {
    * @return an asynchronous thrift client or null if the caller tries to connect the local node.
    */
   public AsyncClient getAsyncHeartbeatClient(Node node) {
-    return getAsyncClient(node, asyncHeartbeatClientPool);
+    return getAsyncClient(node, asyncHeartbeatClientPool, false);
   }
 
   /**
@@ -567,7 +556,7 @@ public abstract class RaftMember {
    * @return the heartbeat client for the node
    */
   public Client getSyncHeartbeatClient(Node node) {
-    return getSyncClient(syncHeartbeatClientPool, node);
+    return getSyncClient(syncHeartbeatClientPool, node, false);
   }
 
   public void sendLogAsync(Log log, AtomicInteger voteCounter, Node node,
@@ -586,28 +575,12 @@ public abstract class RaftMember {
     }
   }
 
-  private Client getSyncClient(SyncClientPool pool, Node node) {
+  private Client getSyncClient(SyncClientPool pool, Node node, boolean activatedOnly) {
     if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
       return null;
     }
 
-    Client client = null;
-    for (int i = 0; i < MAX_RETRY_TIMES_FOR_GET_CLIENT; i++) {
-      client = pool.getClient(node);
-      if (client == null) {
-        // this is typically because the target server is not yet ready (connection refused), so we
-        // wait for a while before reopening the transport to avoid sending requests too frequently
-        try {
-          Thread.sleep(SYNC_CLIENT_TIMEOUT_MS);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return null;
-        }
-      } else {
-        return client;
-      }
-    }
-    return client;
+    return pool.getClient(node, activatedOnly);
   }
 
   public NodeCharacter getCharacter() {
@@ -1262,35 +1235,28 @@ public abstract class RaftMember {
    * the node cannot be reached.
    */
   public AsyncClient getAsyncClient(Node node) {
-    return getAsyncClient(node, asyncClientPool);
+    return getAsyncClient(node, asyncClientPool, true);
+  }
+
+  public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
+    return getAsyncClient(node, asyncClientPool, activatedOnly);
   }
 
   public AsyncClient getSendLogAsyncClient(Node node) {
-    return getAsyncClient(node, asyncSendLogClientPool);
+    return getAsyncClient(node, asyncSendLogClientPool, true);
   }
 
-  private AsyncClient getAsyncClient(Node node, AsyncClientPool pool) {
+  private AsyncClient getAsyncClient(Node node, AsyncClientPool pool, boolean activatedOnly) {
     if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
       return null;
     }
 
-    AsyncClient client = null;
-    for (int i = 0; i < MAX_RETRY_TIMES_FOR_GET_CLIENT; i++) {
-      try {
-        client = pool.getClient(node);
-        if (!ClientUtils.isClientReady(client)) {
-          Thread.sleep(SYNC_CLIENT_TIMEOUT_MS);
-        } else {
-          return client;
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return null;
-      } catch (IOException e) {
-        logger.warn("{} cannot connect to node {}", name, node, e);
-      }
+    try {
+      return pool.getClient(node, activatedOnly);
+    } catch (IOException e) {
+      logger.warn("{} cannot connect to node {}", name, node, e);
+      return null;
     }
-    return client;
   }
 
   /**
@@ -1301,7 +1267,11 @@ public abstract class RaftMember {
    * @return the client if node is available, otherwise null
    */
   public Client getSyncClient(Node node) {
-    return getSyncClient(syncClientPool, node);
+    return getSyncClient(syncClientPool, node, true);
+  }
+
+  public Client getSyncClient(Node node, boolean activatedOnly) {
+    return getSyncClient(syncClientPool, node, activatedOnly);
   }
 
   public AtomicLong getTerm() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index e175248..9cbd35b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.rpc.RpcStat;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
similarity index 58%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 6f748ec..4ec8857 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.manage;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.Objects;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
@@ -28,6 +28,12 @@ import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 @SuppressWarnings("java:S1135")
 public class NodeStatus implements Comparable<NodeStatus> {
 
+  // if a node is deactivated lastly too long ago, it is also assumed activated because it may
+  // have restarted but there is no heartbeat between the two nodes, and the local node cannot
+  // know the fact that it is normal again. Notice that we cannot always rely on the start-up
+  // hello, because it only occurs once and may be lost.
+  private static final long DEACTIVATION_VALID_INTERVAL_MS = 600_000L;
+
   private TNodeStatus status;
   // when is the status last updated, millisecond timestamp, to judge whether we should update
   // the status or not
@@ -36,6 +42,19 @@ public class NodeStatus implements Comparable<NodeStatus> {
   // reflect the node's load or network condition
   private long lastResponseLatency;
 
+  // if a node is judged down by heartbeats or other attempts to connect, isActivated will be set
+  // to false, so further attempts to get clients of this node will fail without a timeout, but
+  // getting clients for heartbeat will not fail so the node can be activated as soon as it is up
+  // again. Clients of associated nodes should take the responsibility to activate or deactivate
+  // the node.
+  private volatile boolean isActivated = true;
+
+  // if there is no heartbeat between the local node and this node, when this node is marked
+  // deactivated, it cannot be reactivated in a normal way. So we also consider it reactivated if
+  // its lastDeactivatedTime is too old.
+  // TODO-Cluster: say hello to other nodes when a node is back online
+  private long lastDeactivatedTime;
+
   //TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
   @Override
   public int compareTo(NodeStatus o) {
@@ -61,11 +80,11 @@ public class NodeStatus implements Comparable<NodeStatus> {
     return Objects.hash(status, lastUpdateTime, lastResponseLatency);
   }
 
-  long getLastUpdateTime() {
+  public long getLastUpdateTime() {
     return lastUpdateTime;
   }
 
-  long getLastResponseLatency() {
+  public long getLastResponseLatency() {
     return lastResponseLatency;
   }
 
@@ -77,11 +96,24 @@ public class NodeStatus implements Comparable<NodeStatus> {
     this.status = status;
   }
 
-  void setLastUpdateTime(long lastUpdateTime) {
+  public void setLastUpdateTime(long lastUpdateTime) {
     this.lastUpdateTime = lastUpdateTime;
   }
 
-  void setLastResponseLatency(long lastResponseLatency) {
+  public void setLastResponseLatency(long lastResponseLatency) {
     this.lastResponseLatency = lastResponseLatency;
   }
+
+  public void activate() {
+    isActivated = true;
+  }
+
+  public void deactivate() {
+    isActivated = false;
+    lastDeactivatedTime = System.currentTimeMillis();
+  }
+
+  public boolean isActivated() {
+    return isActivated || (System.currentTimeMillis() - lastDeactivatedTime) > DEACTIVATION_VALID_INTERVAL_MS;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
similarity index 74%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index e592a87..dceac90 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -17,12 +17,9 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.query.manage;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -39,28 +36,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * QueryCoordinator records the spec and load of each node, deciding the order of replicas that
- * should be queried
+ * NodeStatusManager manages the status (network latency, workload, connectivity) of each node in
+ * the whole cluster. The status is updated on demand, so it may not be up-to-date if not forced to
+ * update.
  */
-public class QueryCoordinator {
-
-  private static final Logger logger = LoggerFactory.getLogger(QueryCoordinator.class);
+public class NodeStatusManager {
 
+  private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
   // a status is considered stale if it is older than one minute and should be updated
   private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
-  private static final QueryCoordinator INSTANCE = new QueryCoordinator();
-
-  private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
+  private static final NodeStatusManager INSTANCE = new NodeStatusManager();
 
   private MetaGroupMember metaGroupMember;
   private Map<Node, NodeStatus> nodeStatusMap = new ConcurrentHashMap<>();
 
-
-  private QueryCoordinator() {
-    // singleton class
-  }
-
-  public static QueryCoordinator getINSTANCE() {
+  public static NodeStatusManager getINSTANCE() {
     return INSTANCE;
   }
 
@@ -68,21 +58,8 @@ public class QueryCoordinator {
     this.metaGroupMember = metaGroupMember;
   }
 
-  /**
-   * Reorder the given nodes based on their status, the nodes that are more suitable (have low delay
-   * or load) are placed first. This won't change the order of the origin list.
-   *
-   * @param nodes
-   * @return
-   */
-  public List<Node> reorderNodes(List<Node> nodes) {
-    List<Node> reordered = new ArrayList<>(nodes);
-    reordered.sort(nodeComparator);
-    return reordered;
-  }
-
   private TNodeStatus getNodeStatusWithAsyncServer(Node node) {
-    TNodeStatus status = null;
+    TNodeStatus status;
     AsyncMetaClient asyncMetaClient = (AsyncMetaClient) metaGroupMember.getAsyncClient(node);
     if (asyncMetaClient == null) {
       return null;
@@ -105,7 +82,7 @@ public class QueryCoordinator {
   }
 
   private TNodeStatus getNodeStatusWithSyncServer(Node node) {
-    TNodeStatus status = null;
+    TNodeStatus status;
     SyncMetaClient syncMetaClient = (SyncMetaClient) metaGroupMember.getSyncClient(node);
     if (syncMetaClient == null) {
       logger.error("Cannot query the node status of {} for no available client", node);
@@ -123,21 +100,37 @@ public class QueryCoordinator {
     return status;
   }
 
-  private NodeStatus getNodeStatus(Node node) {
+  /**
+   * Get the status of the given node. If tryUpdate == true and the current status is older than
+   * NODE_STATUS_UPDATE_INTERVAL_MS, it will be updated.
+   *
+   * @param node
+   * @param tryUpdate when set to true, the manager will try to update the status of the node if it
+   *                  is old enough, otherwise, it will just return the last recorded status.
+   * @return
+   */
+  public NodeStatus getNodeStatus(Node node, boolean tryUpdate) {
     // avoid duplicated computing of concurrent queries
     NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
     if (node.equals(metaGroupMember.getThisNode())) {
       return nodeStatus;
     }
 
+    if (tryUpdate) {
+      tryUpdateNodeStatus(node, nodeStatus);
+    }
+    return nodeStatus;
+  }
+
+  private void tryUpdateNodeStatus(Node node, NodeStatus nodeStatus) {
     long currTime = System.currentTimeMillis();
     if (nodeStatus.getStatus() != null
         && currTime - nodeStatus.getLastUpdateTime() <= NODE_STATUS_UPDATE_INTERVAL_MS) {
-      return nodeStatus;
+      return;
     }
 
     long startTime = System.nanoTime();
-    TNodeStatus status = null;
+    TNodeStatus status;
     if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
       status = getNodeStatusWithAsyncServer(node);
     } else {
@@ -154,11 +147,10 @@ public class QueryCoordinator {
     }
     logger.info("NodeStatus of {} is updated, status: {}, response time: {}", node,
         nodeStatus.getStatus(), nodeStatus.getLastResponseLatency());
-    return nodeStatus;
   }
 
   public long getLastResponseLatency(Node node) {
-    NodeStatus nodeStatus = getNodeStatus(node);
+    NodeStatus nodeStatus = getNodeStatus(node, true);
     return nodeStatus.getLastResponseLatency();
   }
 
@@ -166,4 +158,21 @@ public class QueryCoordinator {
   public void clear() {
     nodeStatusMap.clear();
   }
+
+  public void activate(Node node) {
+    getNodeStatus(node, false).activate();
+  }
+
+  public void deactivate(Node node) {
+    getNodeStatus(node, false).deactivate();
+  }
+
+  /**
+   * @param node
+   * @return whether the node is CURRENTLY available, this method will not try to update its status
+   * to avoid deadlock
+   */
+  public boolean isActivated(Node node) {
+    return getNodeStatus(node, false).isActivated();
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
similarity index 97%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
index af017d4..d012d60 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
similarity index 99%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 4d65241..7b58051 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -18,7 +18,7 @@
  */
 
 
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4ca6eb0..114aa5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -199,4 +199,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
     metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
     resultHandler.onComplete(null);
   }
+
+  @Override
+  public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+    metaGroupMember.handleHandshake(sender);
+    resultHandler.onComplete(null);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 3b5f445..ec1519a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -191,4 +191,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
   public void exile() {
     metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
   }
+
+  @Override
+  public void handshake(Node sender) {
+    metaGroupMember.handleHandshake(sender);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 2892b55..28922ad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.cluster.partition.PartitionTable;
 import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.server.MetaClusterServer;
-import org.apache.iotdb.cluster.server.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StartupException;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index df3cabc..6bf561e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.NodeCharacter;
 import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.service.DataAsyncService;
 import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -138,7 +139,7 @@ public class DataLogApplierTest extends IoTDBTest {
     testDataGroupMember.setLeader(testDataGroupMember.getThisNode());
     testDataGroupMember.setCharacter(NodeCharacter.LEADER);
     testMetaGroupMember.setCharacter(NodeCharacter.LEADER);
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
     partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
     testMetaGroupMember.setClientProvider(new DataClientProvider(new Factory()) {
@@ -206,7 +207,7 @@ public class DataLogApplierTest extends IoTDBTest {
     testMetaGroupMember.stop();
     testMetaGroupMember.closeLogManager();
     super.tearDown();
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(null);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
     IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(partialWriteEnabled);
   }
 
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index c431064..af9b312 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -45,7 +45,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
 import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
 import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.apache.iotdb.db.service.IoTDB;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
index b2d9b55..70bef80 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
 import org.apache.iotdb.cluster.server.member.MemberTest;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.aggregation.AggregateResult;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -74,7 +75,7 @@ public class BaseQueryTest extends MemberTest {
       pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
       dataTypes.add(TSDataType.DOUBLE);
     }
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
     TestUtils.prepareData();
   }
 
@@ -82,7 +83,7 @@ public class BaseQueryTest extends MemberTest {
   @After
   public void tearDown() throws Exception {
     super.tearDown();
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(null);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
   }
 
   void checkSequentialDataset(QueryDataSet dataSet, int offset, int size) throws IOException {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
index e7f8303..622fff0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
 import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
 import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.apache.thrift.protocol.TBinaryProtocol.Factory;
 import org.junit.After;
@@ -89,8 +91,8 @@ public class QueryCoordinatorTest {
         }
       }
     };
-    coordinator.setMetaGroupMember(metaGroupMember);
-    coordinator.clear();
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(metaGroupMember);
+    NodeStatusManager.getINSTANCE().clear();
   }
 
   @After
@@ -108,10 +110,6 @@ public class QueryCoordinatorTest {
     Collections.shuffle(unorderedNodes);
 
     List<Node> reorderedNodes = coordinator.reorderNodes(unorderedNodes);
-    for (Node orderedNode : orderedNodes) {
-      long latency = coordinator.getLastResponseLatency(orderedNode);
-      System.out.printf("%s -> %d%n", orderedNode, latency);
-    }
     assertEquals(orderedNodes, reorderedNodes);
   }
 }
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 7ea8b99..899b0f4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.cluster.common.TestMetaGroupMember;
 import org.apache.iotdb.cluster.common.TestUtils;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.member.RaftMember;
 import org.junit.After;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 32b0b30..c54488b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
 import org.apache.iotdb.cluster.server.Response;
 import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
 import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
 import org.apache.iotdb.cluster.server.service.MetaAsyncService;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -176,7 +177,7 @@ public class MetaGroupMemberTest extends MemberTest {
     buildDataGroups(dataClusterServer);
     testMetaMember.getThisNode().setNodeIdentifier(0);
     mockDataClusterServer = false;
-    QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+    NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
     exiledNode = null;
     System.out.println("Init term of metaGroupMember: " + testMetaMember.getTerm().get());
   }
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 6c86a1f..1008e85 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -455,4 +455,11 @@ service TSMetaService extends RaftService {
 
   Node checkAlive()
 
+  /**
+  * When a node starts, it send handshakes to all other nodes so they know the node is alive
+  * again. Notice that heartbeats exists only between leaders and followers, so coordinators
+  * cannot know when another node resumes, and handshakes are mainly used to update node status
+  * on coordinator side.
+  **/
+  void handshake(Node sender);
 }