You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/24 04:23:35 UTC
[iotdb] 01/01: refactor node management
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_node_management
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6d25ba5c2b642c4e5d3977eeb08b17f5646b8556
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 24 12:20:58 2020 +0800
refactor node management
---
.../cluster/client/async/AsyncClientPool.java | 80 +++++---------
.../iotdb/cluster/client/sync/SyncClientPool.java | 24 ++++-
.../apache/iotdb/cluster/log/LogDispatcher.java | 4 +-
.../cluster/log/applier/AsyncDataLogApplier.java | 4 +-
.../iotdb/cluster/log/catchup/CatchUpTask.java | 2 +-
.../iotdb/cluster/log/manage/RaftLogManager.java | 2 +-
.../cluster/query/manage/QueryCoordinator.java | 115 ++-------------------
.../iotdb/cluster/server/DataClusterServer.java | 2 +-
.../iotdb/cluster/server/MetaClusterServer.java | 10 ++
.../handlers/caller/AppendNodeEntryHandler.java | 6 +-
.../server/handlers/caller/HeartbeatHandler.java | 2 +-
.../cluster/server/member/DataGroupMember.java | 13 +--
.../cluster/server/member/MetaGroupMember.java | 48 +++++++--
.../iotdb/cluster/server/member/RaftMember.java | 78 +++++---------
.../cluster/server/{ => monitor}/NodeReport.java | 3 +-
.../manage => server/monitor}/NodeStatus.java | 42 +++++++-
.../monitor/NodeStatusManager.java} | 85 ++++++++-------
.../iotdb/cluster/server/{ => monitor}/Peer.java | 2 +-
.../iotdb/cluster/server/{ => monitor}/Timer.java | 2 +-
.../cluster/server/service/MetaAsyncService.java | 6 ++
.../cluster/server/service/MetaSyncService.java | 5 +
.../cluster/utils/nodetool/ClusterMonitor.java | 2 +-
.../cluster/log/applier/DataLogApplierTest.java | 5 +-
.../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 2 +-
.../apache/iotdb/cluster/query/BaseQueryTest.java | 5 +-
.../cluster/query/manage/QueryCoordinatorTest.java | 10 +-
.../caller/AppendNodeEntryHandlerTest.java | 2 +-
.../cluster/server/member/MetaGroupMemberTest.java | 3 +-
thrift/src/main/thrift/cluster.thrift | 7 ++
29 files changed, 262 insertions(+), 309 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 4127af7..be96b53 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -24,13 +24,10 @@ import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.utils.ClusterNode;
import org.apache.thrift.async.TAsyncMethodCall;
import org.slf4j.Logger;
@@ -43,34 +40,22 @@ public class AsyncClientPool {
private int maxConnectionForEachNode;
private Map<ClusterNode, Deque<AsyncClient>> clientCaches = new ConcurrentHashMap<>();
private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
- private Map<ClusterNode, Integer> nodeErrorClientCountMap = new ConcurrentHashMap<>();
private AsyncClientFactory asyncClientFactory;
- private ScheduledExecutorService cleanErrorClientExecutorService;
- // when set to true, if MAX_ERROR_COUNT errors occurs continuously when connecting to node, any
- // further requests to the node will be rejected for PROBE_NODE_STATUS_PERIOD_SECOND
- // heartbeats should not be blocked
- private boolean blockOnError;
-
- private static final int MAX_ERROR_COUNT = 3;
- private static final int PROBE_NODE_STATUS_PERIOD_SECOND = 60;
public AsyncClientPool(AsyncClientFactory asyncClientFactory) {
- this(asyncClientFactory, true);
- }
-
- public AsyncClientPool(AsyncClientFactory asyncClientFactory, boolean blockOnError) {
this.asyncClientFactory = asyncClientFactory;
this.maxConnectionForEachNode =
ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
- this.blockOnError = blockOnError;
- if (blockOnError) {
- this.cleanErrorClientExecutorService = new ScheduledThreadPoolExecutor(1,
- new BasicThreadFactory.Builder().namingPattern("clean-error-client-%d").daemon(true)
- .build());
- this.cleanErrorClientExecutorService
- .scheduleAtFixedRate(this::cleanErrorClients, PROBE_NODE_STATUS_PERIOD_SECOND,
- PROBE_NODE_STATUS_PERIOD_SECOND, TimeUnit.SECONDS);
- }
+ }
+
+ /**
+ * See getClient(Node node, boolean activatedOnly)
+ * @param node
+ * @return
+ * @throws IOException
+ */
+ public AsyncClient getClient(Node node) throws IOException {
+ return getClient(node, true);
}
/**
@@ -79,20 +64,24 @@ public class AsyncClientPool {
* IMPORTANT!!! The caller should check whether the return value is null or not!
*
* @param node the node want to connect
+ * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
+ * true, which avoid unnecessary wait for already down nodes, but
+ * heartbeat attempts should always try to connect so the node can be
+ * reactivated ASAP
* @return if the node can connect, return the client, otherwise null
* @throws IOException if the node can not be connected
*/
- public AsyncClient getClient(Node node) throws IOException {
+ public AsyncClient getClient(Node node, boolean activatedOnly) throws IOException {
ClusterNode clusterNode = new ClusterNode(node);
- if (blockOnError && nodeErrorClientCountMap.getOrDefault(clusterNode, 0) > MAX_ERROR_COUNT) {
- throw new IOException(String.format("connect node failed, maybe the node is down, %s", node));
+ if (activatedOnly && NodeStatusManager.getINSTANCE().isActivated(node)) {
+ return null;
}
AsyncClient client;
+ //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
+ Deque<AsyncClient> clientStack = clientCaches.computeIfAbsent(clusterNode,
+ n -> new ArrayDeque<>());
synchronized (this) {
- //As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<AsyncClient> clientStack = clientCaches.computeIfAbsent(clusterNode,
- n -> new ArrayDeque<>());
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
@@ -185,38 +174,15 @@ public class AsyncClientPool {
((AsyncMetaClient) client).close();
}
}
- clientStack.clear();
nodeClientNumMap.put(clusterNode, 0);
this.notifyAll();
- }
- if (!blockOnError) {
- return;
- }
- synchronized (this) {
- if (nodeErrorClientCountMap.containsKey(clusterNode)) {
- nodeErrorClientCountMap.put(clusterNode, nodeErrorClientCountMap.get(clusterNode) + 1);
- } else {
- nodeErrorClientCountMap.put(clusterNode, 1);
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("the node={}, connect error times={}", clusterNode,
- nodeErrorClientCountMap.get(clusterNode));
+ NodeStatusManager.getINSTANCE().deactivate(node);
}
}
@SuppressWarnings("squid:S1135")
void onComplete(Node node) {
- ClusterNode clusterNode = new ClusterNode(node);
- // TODO: if the heartbeat client pool completes, also unblock another pool
- nodeErrorClientCountMap.remove(clusterNode);
- }
-
- void cleanErrorClients() {
- synchronized (this) {
- nodeErrorClientCountMap.clear();
- logger.debug("clean all error clients");
- }
+ NodeStatusManager.getINSTANCE().activate(node);
}
void recreateClient(Node node) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index c9c3e78..d91adb0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.utils.ClusterNode;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -49,15 +50,31 @@ public class SyncClientPool {
}
/**
+ * See getClient(Node node, boolean activatedOnly)
+ * @param node
+ * @return
+ */
+ public Client getClient(Node node) {
+ return getClient(node, true);
+ }
+
+ /**
* Get a client of the given node from the cache if one is available, or create a new one.
* <p>
* IMPORTANT!!! The caller should check whether the return value is null or not!
*
- * @param node the node want to connect
+ * @param node the node want to connect
+ * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
+ * true, which avoid unnecessary wait for already down nodes, but heartbeat
+ * attempts should always try to connect so the node can be reactivated ASAP
* @return if the node can connect, return the client, otherwise null
*/
- public Client getClient(Node node) {
+ public Client getClient(Node node, boolean activatedOnly) {
ClusterNode clusterNode = new ClusterNode(node);
+ if (activatedOnly && NodeStatusManager.getINSTANCE().isActivated(node)) {
+ return null;
+ }
+
//As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
synchronized (this) {
@@ -111,12 +128,15 @@ public class SyncClientPool {
synchronized (this) {
if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
clientStack.push(client);
+ NodeStatusManager.getINSTANCE().activate(node);
} else {
try {
clientStack.push(syncClientFactory.getSyncClient(node, this));
+ NodeStatusManager.getINSTANCE().activate(node);
} catch (TTransportException e) {
logger.error("Cannot open transport for client {}", node, e);
nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue - 1);
+ NodeStatusManager.getINSTANCE().deactivate(node);
}
}
this.notifyAll();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
index 21820d5..a813fa4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/LogDispatcher.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.cluster.server.Peer;
-import org.apache.iotdb.cluster.server.Timer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
index bfe57a1..e1f7c37 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/AsyncDataLogApplier.java
@@ -34,8 +34,8 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.PartialPath;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
index 23687ad..8a266ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/CatchUpTask.java
@@ -35,7 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.utils.TestOnly;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
index bb8b231..21b9dbd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogApplier;
import org.apache.iotdb.cluster.log.Snapshot;
import org.apache.iotdb.cluster.log.StableEntryManager;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.db.utils.TestOnly;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
index e592a87..91d193a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
@@ -19,24 +19,12 @@
package org.apache.iotdb.cluster.query.manage;
-import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
-import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
-import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
/**
* QueryCoordinator records the spec and load of each node, deciding the order of replicas that
@@ -44,17 +32,12 @@ import org.slf4j.LoggerFactory;
*/
public class QueryCoordinator {
- private static final Logger logger = LoggerFactory.getLogger(QueryCoordinator.class);
-
// a status is considered stale if it is older than one minute and should be updated
- private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
private static final QueryCoordinator INSTANCE = new QueryCoordinator();
+ private static final NodeStatusManager STATUS_MANAGER = NodeStatusManager.getINSTANCE();
private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
- private MetaGroupMember metaGroupMember;
- private Map<Node, NodeStatus> nodeStatusMap = new ConcurrentHashMap<>();
-
private QueryCoordinator() {
// singleton class
@@ -64,13 +47,9 @@ public class QueryCoordinator {
return INSTANCE;
}
- public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
- this.metaGroupMember = metaGroupMember;
- }
-
/**
* Reorder the given nodes based on their status, the nodes that are more suitable (have low delay
- * or load) are placed first. This won't change the order of the origin list.
+ * or load) are placed first. This won't change the order of the original list.
*
* @param nodes
* @return
@@ -81,89 +60,7 @@ public class QueryCoordinator {
return reordered;
}
- private TNodeStatus getNodeStatusWithAsyncServer(Node node) {
- TNodeStatus status = null;
- AsyncMetaClient asyncMetaClient = (AsyncMetaClient) metaGroupMember.getAsyncClient(node);
- if (asyncMetaClient == null) {
- return null;
- }
- try {
- status = SyncClientAdaptor.queryNodeStatus(asyncMetaClient);
- } catch (TException e) {
- if (e.getCause() instanceof ConnectException) {
- logger.warn("Cannot query the node status of {}: {}", node, e.getCause());
- } else {
- logger.error("query node status failed {}", node, e);
- }
- return null;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Cannot query the node status of {}", node, e);
- return null;
- }
- return status;
- }
-
- private TNodeStatus getNodeStatusWithSyncServer(Node node) {
- TNodeStatus status = null;
- SyncMetaClient syncMetaClient = (SyncMetaClient) metaGroupMember.getSyncClient(node);
- if (syncMetaClient == null) {
- logger.error("Cannot query the node status of {} for no available client", node);
- return null;
- }
- try {
- status = syncMetaClient.queryNodeStatus();
- } catch (TException e) {
- syncMetaClient.getInputProtocol().getTransport().close();
- logger.error("Cannot query the node status of {}", node, e);
- return null;
- } finally {
- ClientUtils.putBackSyncClient(syncMetaClient);
- }
- return status;
- }
-
- private NodeStatus getNodeStatus(Node node) {
- // avoid duplicated computing of concurrent queries
- NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
- if (node.equals(metaGroupMember.getThisNode())) {
- return nodeStatus;
- }
-
- long currTime = System.currentTimeMillis();
- if (nodeStatus.getStatus() != null
- && currTime - nodeStatus.getLastUpdateTime() <= NODE_STATUS_UPDATE_INTERVAL_MS) {
- return nodeStatus;
- }
-
- long startTime = System.nanoTime();
- TNodeStatus status = null;
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- status = getNodeStatusWithAsyncServer(node);
- } else {
- status = getNodeStatusWithSyncServer(node);
- }
- long responseTime = System.nanoTime() - startTime;
-
- if (status != null) {
- nodeStatus.setStatus(status);
- nodeStatus.setLastUpdateTime(System.currentTimeMillis());
- nodeStatus.setLastResponseLatency(responseTime);
- } else {
- nodeStatus.setLastResponseLatency(Long.MAX_VALUE);
- }
- logger.info("NodeStatus of {} is updated, status: {}, response time: {}", node,
- nodeStatus.getStatus(), nodeStatus.getLastResponseLatency());
- return nodeStatus;
- }
-
- public long getLastResponseLatency(Node node) {
- NodeStatus nodeStatus = getNodeStatus(node);
- return nodeStatus.getLastResponseLatency();
- }
-
- @TestOnly
- public void clear() {
- nodeStatusMap.clear();
+ public NodeStatus getNodeStatus(Node node) {
+ return STATUS_MANAGER.getNodeStatus(node, true);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 391cf69..6e5bca6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -59,7 +59,7 @@ import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncProcessor;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Processor;
-import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.service.DataAsyncService;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
index a0fb04d..98ae174 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java
@@ -343,4 +343,14 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async
AsyncMethodCallback<Void> resultHandler) {
asyncService.removeHardLink(hardLinkPath, resultHandler);
}
+
+ @Override
+ public void handshake(Node sender) {
+ syncService.handshake(sender);
+ }
+
+ @Override
+ public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+ asyncService.handshake(sender, resultHandler);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
index 8e6b6ce..f82f24e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandler.java
@@ -27,9 +27,9 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.Peer;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Peer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
index 433c9d7..7a7bab0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/handlers/caller/HeartbeatHandler.java
@@ -24,7 +24,7 @@ import static org.apache.iotdb.cluster.server.Response.RESPONSE_AGREE;
import java.net.ConnectException;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index de200cf..4af23b5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -77,12 +77,13 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.NodeReport.DataMemberReport;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.DataMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.PullSnapshotHintService;
import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatThread;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -160,7 +161,7 @@ public class DataGroupMember extends RaftMember {
super("Data(" + nodes.getHeader().getIp() + ":" + nodes.getHeader().getMetaPort() + ")",
new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
new SyncClientPool(new SyncDataClient.FactorySync(factory)),
- new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory), false),
+ new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)),
new SyncClientPool(new SyncDataHeartbeatClient.FactorySync(factory)),
new AsyncClientPool(new SingleManagerFactory(factory)));
this.thisNode = thisNode;
@@ -768,7 +769,7 @@ public class DataGroupMember extends RaftMember {
return new DataMemberReport(character, leader.get(), term.get(),
logManager.getLastLogTerm(), lastReportedLogIndex, logManager.getCommitLogIndex(),
logManager.getCommitLogTerm(), getHeader(), readOnly,
- QueryCoordinator.getINSTANCE()
+ NodeStatusManager.getINSTANCE()
.getLastResponseLatency(getHeader()), lastHeartbeatReceivedTime, prevLastLogIndex,
logManager.getMaxHaveAppliedCommitIndex());
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 7d2b776..d36816c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -88,7 +88,6 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.query.ClusterPlanRouter;
-import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
@@ -105,17 +104,18 @@ import org.apache.iotdb.cluster.server.ClientServer;
import org.apache.iotdb.cluster.server.DataClusterServer;
import org.apache.iotdb.cluster.server.HardLinkCleaner;
import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.NodeReport;
-import org.apache.iotdb.cluster.server.NodeReport.MetaMemberReport;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
import org.apache.iotdb.cluster.server.handlers.caller.AppendGroupEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatThread;
import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
+import org.apache.iotdb.cluster.server.monitor.NodeReport;
+import org.apache.iotdb.cluster.server.monitor.NodeReport.MetaMemberReport;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
+import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils;
@@ -271,7 +271,7 @@ public class MetaGroupMember extends RaftMember {
public MetaGroupMember(TProtocolFactory factory, Node thisNode) throws QueryProcessException {
super("Meta", new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)),
new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
- new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory), false),
+ new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
allNodes = new ArrayList<>();
initPeerMap();
@@ -335,7 +335,7 @@ public class MetaGroupMember extends RaftMember {
return;
}
addSeedNodes();
- QueryCoordinator.getINSTANCE().setMetaGroupMember(this);
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(this);
super.start();
}
@@ -783,6 +783,7 @@ public class MetaGroupMember extends RaftMember {
try {
getDataClusterServer().buildDataGroupMembers(partitionTable);
initSubServers();
+ sendHandshake();
} catch (TTransportException | StartupException e) {
logger.error("Build partition table failed: ", e);
stop();
@@ -793,6 +794,29 @@ public class MetaGroupMember extends RaftMember {
}
/**
+ * When the node restarts, it sends handshakes to all other nodes so they may know it is back.
+ */
+ private void sendHandshake() {
+ for (Node node : allNodes) {
+ try {
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ AsyncMetaClient asyncClient = (AsyncMetaClient) getAsyncClient(node);
+ if (asyncClient != null) {
+ asyncClient.handshake(thisNode, new GenericHandler<>(node, null));
+ }
+ } else {
+ SyncMetaClient syncClient = (SyncMetaClient) getSyncClient(node);
+ if (syncClient != null) {
+ syncClient.handshake(thisNode);
+ }
+ }
+ } catch (TException e) {
+ // ignore handshake exceptions
+ }
+ }
+ }
+
+ /**
* Process the join cluster request of "node". Only proceed when the partition table is ready.
*
* @param node cannot be the local node
@@ -1022,7 +1046,7 @@ public class MetaGroupMember extends RaftMember {
private CheckStatusResponse checkStatus(Node seedNode) {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode);
+ AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode, false);
if (client == null) {
return null;
}
@@ -1035,7 +1059,7 @@ public class MetaGroupMember extends RaftMember {
logger.warn("Current thread is interrupted.");
}
} else {
- SyncMetaClient client = (SyncMetaClient) getSyncClient(seedNode);
+ SyncMetaClient client = (SyncMetaClient) getSyncClient(seedNode, false);
if (client == null) {
return null;
}
@@ -1804,7 +1828,7 @@ public class MetaGroupMember extends RaftMember {
private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, Node header)
throws IOException {
- Client client = null;
+ Client client;
try {
client = getClientProvider().getSyncDataClient(receiver,
RaftServer.getWriteOperationTimeoutMS());
@@ -1903,7 +1927,7 @@ public class MetaGroupMember extends RaftMember {
}
}
- private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) throws TException {
+ private void getNodeStatusSync(Map<Node, Boolean> nodeStatus) {
NodeStatusHandler nodeStatusHandler = new NodeStatusHandler(nodeStatus);
for (Node node : allNodes) {
SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
@@ -2175,4 +2199,8 @@ public class MetaGroupMember extends RaftMember {
public void setClientProvider(DataClientProvider dataClientProvider) {
this.dataClientProvider = dataClientProvider;
}
+
+ public void handleHandshake(Node sender) {
+ NodeStatusManager.getINSTANCE().activate(sender);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 14271dc..c2443d1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -73,11 +73,11 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
-import org.apache.iotdb.cluster.server.Timer;
-import org.apache.iotdb.cluster.server.Timer.Statistic;
+import org.apache.iotdb.cluster.server.monitor.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.utils.ClientUtils;
@@ -123,17 +123,6 @@ public abstract class RaftMember {
private static long waitLeaderTimeMs = 60 * 1000L;
/**
- * when opening a client failed (connection refused), wait for a while to avoid sending useless
- * connection requests too frequently
- */
- private static final long SYNC_CLIENT_TIMEOUT_MS = 1000;
-
- /**
- * the max retry times for get a available client
- */
- private static final int MAX_RETRY_TIMES_FOR_GET_CLIENT = 5;
-
- /**
* when the leader of this node changes, the condition will be notified so other threads that wait
* on this may be woken.
*/
@@ -558,7 +547,7 @@ public abstract class RaftMember {
* @return an asynchronous thrift client or null if the caller tries to connect the local node.
*/
public AsyncClient getAsyncHeartbeatClient(Node node) {
- return getAsyncClient(node, asyncHeartbeatClientPool);
+ return getAsyncClient(node, asyncHeartbeatClientPool, false);
}
/**
@@ -567,7 +556,7 @@ public abstract class RaftMember {
* @return the heartbeat client for the node
*/
public Client getSyncHeartbeatClient(Node node) {
- return getSyncClient(syncHeartbeatClientPool, node);
+ return getSyncClient(syncHeartbeatClientPool, node, false);
}
public void sendLogAsync(Log log, AtomicInteger voteCounter, Node node,
@@ -586,28 +575,12 @@ public abstract class RaftMember {
}
}
- private Client getSyncClient(SyncClientPool pool, Node node) {
+ private Client getSyncClient(SyncClientPool pool, Node node, boolean activatedOnly) {
if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
return null;
}
- Client client = null;
- for (int i = 0; i < MAX_RETRY_TIMES_FOR_GET_CLIENT; i++) {
- client = pool.getClient(node);
- if (client == null) {
- // this is typically because the target server is not yet ready (connection refused), so we
- // wait for a while before reopening the transport to avoid sending requests too frequently
- try {
- Thread.sleep(SYNC_CLIENT_TIMEOUT_MS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- }
- } else {
- return client;
- }
- }
- return client;
+ return pool.getClient(node, activatedOnly);
}
public NodeCharacter getCharacter() {
@@ -1262,35 +1235,28 @@ public abstract class RaftMember {
* the node cannot be reached.
*/
public AsyncClient getAsyncClient(Node node) {
- return getAsyncClient(node, asyncClientPool);
+ return getAsyncClient(node, asyncClientPool, true);
+ }
+
+ public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
+ return getAsyncClient(node, asyncClientPool, activatedOnly);
}
public AsyncClient getSendLogAsyncClient(Node node) {
- return getAsyncClient(node, asyncSendLogClientPool);
+ return getAsyncClient(node, asyncSendLogClientPool, true);
}
- private AsyncClient getAsyncClient(Node node, AsyncClientPool pool) {
+ private AsyncClient getAsyncClient(Node node, AsyncClientPool pool, boolean activatedOnly) {
if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
return null;
}
- AsyncClient client = null;
- for (int i = 0; i < MAX_RETRY_TIMES_FOR_GET_CLIENT; i++) {
- try {
- client = pool.getClient(node);
- if (!ClientUtils.isClientReady(client)) {
- Thread.sleep(SYNC_CLIENT_TIMEOUT_MS);
- } else {
- return client;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return null;
- } catch (IOException e) {
- logger.warn("{} cannot connect to node {}", name, node, e);
- }
+ try {
+ return pool.getClient(node, activatedOnly);
+ } catch (IOException e) {
+ logger.warn("{} cannot connect to node {}", name, node, e);
+ return null;
}
- return client;
}
/**
@@ -1301,7 +1267,11 @@ public abstract class RaftMember {
* @return the client if node is available, otherwise null
*/
public Client getSyncClient(Node node) {
- return getSyncClient(syncClientPool, node);
+ return getSyncClient(syncClientPool, node, true);
+ }
+
+ public Client getSyncClient(Node node, boolean activatedOnly) {
+ return getSyncClient(syncClientPool, node, activatedOnly);
}
public AtomicLong getTerm() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
similarity index 98%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
index e175248..9cbd35b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeReport.java
@@ -17,11 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.rpc.RpcStat;
import org.apache.iotdb.rpc.RpcTransportFactory;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
similarity index 58%
rename from cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
index 6f748ec..4ec8857 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/NodeStatus.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatus.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.cluster.query.manage;
+package org.apache.iotdb.cluster.server.monitor;
import java.util.Objects;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
@@ -28,6 +28,12 @@ import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
@SuppressWarnings("java:S1135")
public class NodeStatus implements Comparable<NodeStatus> {
+ // if a node is deactivated lastly too long ago, it is also assumed activated because it may
+ // have restarted but there is no heartbeat between the two nodes, and the local node cannot
+ // know the fact that it is normal again. Notice that we cannot always rely on the start-up
+ // hello, because it only occurs once and may be lost.
+ private static final long DEACTIVATION_VALID_INTERVAL_MS = 600_000L;
+
private TNodeStatus status;
// when is the status last updated, millisecond timestamp, to judge whether we should update
// the status or not
@@ -36,6 +42,19 @@ public class NodeStatus implements Comparable<NodeStatus> {
// reflect the node's load or network condition
private long lastResponseLatency;
+ // if a node is judged down by heartbeats or other attempts to connect, isActivated will be set
+ // to false, so further attempts to get clients of this node will fail without a timeout, but
+ // getting clients for heartbeat will not fail so the node can be activated as soon as it is up
+ // again. Clients of associated nodes should take the responsibility to activate or deactivate
+ // the node.
+ private volatile boolean isActivated = true;
+
+ // if there is no heartbeat between the local node and this node, when this node is marked
+ // deactivated, it cannot be reactivated in a normal way. So we also consider it reactivated if
+ // its lastDeactivatedTime is too old.
+ // TODO-Cluster: say hello to other nodes when a node is back online
+ private long lastDeactivatedTime;
+
//TODO-Cluster: decide what should be contained in NodeStatus and how two compare two NodeStatus
@Override
public int compareTo(NodeStatus o) {
@@ -61,11 +80,11 @@ public class NodeStatus implements Comparable<NodeStatus> {
return Objects.hash(status, lastUpdateTime, lastResponseLatency);
}
- long getLastUpdateTime() {
+ public long getLastUpdateTime() {
return lastUpdateTime;
}
- long getLastResponseLatency() {
+ public long getLastResponseLatency() {
return lastResponseLatency;
}
@@ -77,11 +96,24 @@ public class NodeStatus implements Comparable<NodeStatus> {
this.status = status;
}
- void setLastUpdateTime(long lastUpdateTime) {
+ public void setLastUpdateTime(long lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
- void setLastResponseLatency(long lastResponseLatency) {
+ public void setLastResponseLatency(long lastResponseLatency) {
this.lastResponseLatency = lastResponseLatency;
}
+
+ public void activate() {
+ isActivated = true;
+ }
+
+ public void deactivate() {
+ isActivated = false;
+ lastDeactivatedTime = System.currentTimeMillis();
+ }
+
+ public boolean isActivated() {
+ return isActivated || (System.currentTimeMillis() - lastDeactivatedTime) > DEACTIVATION_VALID_INTERVAL_MS;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
similarity index 74%
copy from cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
index e592a87..dceac90 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/manage/QueryCoordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/NodeStatusManager.java
@@ -17,12 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.cluster.query.manage;
+package org.apache.iotdb.cluster.server.monitor;
import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
@@ -39,28 +36,21 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * QueryCoordinator records the spec and load of each node, deciding the order of replicas that
- * should be queried
+ * NodeStatusManager manages the status (network latency, workload, connectivity) of each node in
+ * the whole cluster. The status is updated on demand, so it may not be up-to-date if not forced to
+ * update.
*/
-public class QueryCoordinator {
-
- private static final Logger logger = LoggerFactory.getLogger(QueryCoordinator.class);
+public class NodeStatusManager {
+ private static final Logger logger = LoggerFactory.getLogger(NodeStatusManager.class);
// a status is considered stale if it is older than one minute and should be updated
private static final long NODE_STATUS_UPDATE_INTERVAL_MS = 60 * 1000L;
- private static final QueryCoordinator INSTANCE = new QueryCoordinator();
-
- private final Comparator<Node> nodeComparator = Comparator.comparing(this::getNodeStatus);
+ private static final NodeStatusManager INSTANCE = new NodeStatusManager();
private MetaGroupMember metaGroupMember;
private Map<Node, NodeStatus> nodeStatusMap = new ConcurrentHashMap<>();
-
- private QueryCoordinator() {
- // singleton class
- }
-
- public static QueryCoordinator getINSTANCE() {
+ public static NodeStatusManager getINSTANCE() {
return INSTANCE;
}
@@ -68,21 +58,8 @@ public class QueryCoordinator {
this.metaGroupMember = metaGroupMember;
}
- /**
- * Reorder the given nodes based on their status, the nodes that are more suitable (have low delay
- * or load) are placed first. This won't change the order of the origin list.
- *
- * @param nodes
- * @return
- */
- public List<Node> reorderNodes(List<Node> nodes) {
- List<Node> reordered = new ArrayList<>(nodes);
- reordered.sort(nodeComparator);
- return reordered;
- }
-
private TNodeStatus getNodeStatusWithAsyncServer(Node node) {
- TNodeStatus status = null;
+ TNodeStatus status;
AsyncMetaClient asyncMetaClient = (AsyncMetaClient) metaGroupMember.getAsyncClient(node);
if (asyncMetaClient == null) {
return null;
@@ -105,7 +82,7 @@ public class QueryCoordinator {
}
private TNodeStatus getNodeStatusWithSyncServer(Node node) {
- TNodeStatus status = null;
+ TNodeStatus status;
SyncMetaClient syncMetaClient = (SyncMetaClient) metaGroupMember.getSyncClient(node);
if (syncMetaClient == null) {
logger.error("Cannot query the node status of {} for no available client", node);
@@ -123,21 +100,37 @@ public class QueryCoordinator {
return status;
}
- private NodeStatus getNodeStatus(Node node) {
+ /**
+ * Get the status of the given node. If tryUpdate == true and the current status is older than
+ * NODE_STATUS_UPDATE_INTERVAL_MS, it will be updated.
+ *
+ * @param node
+ * @param tryUpdate when set to true, the manager will try to update the status of the node if it
+ * is old enough, otherwise, it will just return the last recorded status.
+ * @return
+ */
+ public NodeStatus getNodeStatus(Node node, boolean tryUpdate) {
// avoid duplicated computing of concurrent queries
NodeStatus nodeStatus = nodeStatusMap.computeIfAbsent(node, n -> new NodeStatus());
if (node.equals(metaGroupMember.getThisNode())) {
return nodeStatus;
}
+ if (tryUpdate) {
+ tryUpdateNodeStatus(node, nodeStatus);
+ }
+ return nodeStatus;
+ }
+
+ private void tryUpdateNodeStatus(Node node, NodeStatus nodeStatus) {
long currTime = System.currentTimeMillis();
if (nodeStatus.getStatus() != null
&& currTime - nodeStatus.getLastUpdateTime() <= NODE_STATUS_UPDATE_INTERVAL_MS) {
- return nodeStatus;
+ return;
}
long startTime = System.nanoTime();
- TNodeStatus status = null;
+ TNodeStatus status;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
status = getNodeStatusWithAsyncServer(node);
} else {
@@ -154,11 +147,10 @@ public class QueryCoordinator {
}
logger.info("NodeStatus of {} is updated, status: {}, response time: {}", node,
nodeStatus.getStatus(), nodeStatus.getLastResponseLatency());
- return nodeStatus;
}
public long getLastResponseLatency(Node node) {
- NodeStatus nodeStatus = getNodeStatus(node);
+ NodeStatus nodeStatus = getNodeStatus(node, true);
return nodeStatus.getLastResponseLatency();
}
@@ -166,4 +158,21 @@ public class QueryCoordinator {
public void clear() {
nodeStatusMap.clear();
}
+
+ public void activate(Node node) {
+ getNodeStatus(node, false).activate();
+ }
+
+ public void deactivate(Node node) {
+ getNodeStatus(node, false).deactivate();
+ }
+
+ /**
+ * @param node
+ * @return whether the node is CURRENTLY available, this method will not try to update its status
+ * to avoid deadlock
+ */
+ public boolean isActivated(Node node) {
+ return getNodeStatus(node, false).isActivated();
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
similarity index 97%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
index af017d4..d012d60 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Peer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Peer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
import java.util.concurrent.atomic.AtomicInteger;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
similarity index 99%
rename from cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
index 4d65241..7b58051 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/Timer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/monitor/Timer.java
@@ -18,7 +18,7 @@
*/
-package org.apache.iotdb.cluster.server;
+package org.apache.iotdb.cluster.server.monitor;
import java.util.ArrayList;
import java.util.List;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
index 4ca6eb0..114aa5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java
@@ -199,4 +199,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService.
metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
resultHandler.onComplete(null);
}
+
+ @Override
+ public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
+ metaGroupMember.handleHandshake(sender);
+ resultHandler.onComplete(null);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
index 3b5f445..ec1519a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java
@@ -191,4 +191,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If
public void exile() {
metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode());
}
+
+ @Override
+ public void handshake(Node sender) {
+ metaGroupMember.handleHandshake(sender);
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
index 2892b55..28922ad 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/nodetool/ClusterMonitor.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.MetaClusterServer;
-import org.apache.iotdb.cluster.server.Timer;
+import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StartupException;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index df3cabc..6bf561e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.service.DataAsyncService;
import org.apache.iotdb.cluster.server.service.MetaAsyncService;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -138,7 +139,7 @@ public class DataLogApplierTest extends IoTDBTest {
testDataGroupMember.setLeader(testDataGroupMember.getThisNode());
testDataGroupMember.setCharacter(NodeCharacter.LEADER);
testMetaGroupMember.setCharacter(NodeCharacter.LEADER);
- QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaGroupMember);
partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
testMetaGroupMember.setClientProvider(new DataClientProvider(new Factory()) {
@@ -206,7 +207,7 @@ public class DataLogApplierTest extends IoTDBTest {
testMetaGroupMember.stop();
testMetaGroupMember.closeLogManager();
super.tearDown();
- QueryCoordinator.getINSTANCE().setMetaGroupMember(null);
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(partialWriteEnabled);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index c431064..af9b312 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -45,7 +45,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.server.NodeCharacter;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.apache.iotdb.db.service.IoTDB;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
index b2d9b55..70bef80 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
@@ -30,6 +30,7 @@ import java.util.List;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.query.manage.QueryCoordinator;
import org.apache.iotdb.cluster.server.member.MemberTest;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -74,7 +75,7 @@ public class BaseQueryTest extends MemberTest {
pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
dataTypes.add(TSDataType.DOUBLE);
}
- QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
TestUtils.prepareData();
}
@@ -82,7 +83,7 @@ public class BaseQueryTest extends MemberTest {
@After
public void tearDown() throws Exception {
super.tearDown();
- QueryCoordinator.getINSTANCE().setMetaGroupMember(null);
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(null);
}
void checkSequentialDataset(QueryDataSet dataSet, int offset, int size) throws IOException {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
index e7f8303..622fff0 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
@@ -35,6 +35,8 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.server.monitor.NodeStatus;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
@@ -89,8 +91,8 @@ public class QueryCoordinatorTest {
}
}
};
- coordinator.setMetaGroupMember(metaGroupMember);
- coordinator.clear();
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(metaGroupMember);
+ NodeStatusManager.getINSTANCE().clear();
}
@After
@@ -108,10 +110,6 @@ public class QueryCoordinatorTest {
Collections.shuffle(unorderedNodes);
List<Node> reorderedNodes = coordinator.reorderNodes(unorderedNodes);
- for (Node orderedNode : orderedNodes) {
- long latency = coordinator.getLastResponseLatency(orderedNode);
- System.out.printf("%s -> %d%n", orderedNode, latency);
- }
assertEquals(orderedNodes, reorderedNodes);
}
}
\ No newline at end of file
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
index 7ea8b99..899b0f4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/handlers/caller/AppendNodeEntryHandlerTest.java
@@ -34,7 +34,7 @@ import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.log.Log;
-import org.apache.iotdb.cluster.server.Peer;
+import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.member.RaftMember;
import org.junit.After;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 32b0b30..c54488b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -90,6 +90,7 @@ import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.heartbeat.DataHeartbeatServer;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.service.MetaAsyncService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -176,7 +177,7 @@ public class MetaGroupMemberTest extends MemberTest {
buildDataGroups(dataClusterServer);
testMetaMember.getThisNode().setNodeIdentifier(0);
mockDataClusterServer = false;
- QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember);
+ NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
exiledNode = null;
System.out.println("Init term of metaGroupMember: " + testMetaMember.getTerm().get());
}
diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift
index 6c86a1f..1008e85 100644
--- a/thrift/src/main/thrift/cluster.thrift
+++ b/thrift/src/main/thrift/cluster.thrift
@@ -455,4 +455,11 @@ service TSMetaService extends RaftService {
Node checkAlive()
+ /**
+ * When a node starts, it send handshakes to all other nodes so they know the node is alive
+ * again. Notice that heartbeats exists only between leaders and followers, so coordinators
+ * cannot know when another node resumes, and handshakes are mainly used to update node status
+ * on coordinator side.
+ **/
+ void handshake(Node sender);
}