You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/07/06 14:22:39 UTC
[iotdb] branch master updated: [IOTDB-3711] Update RegionRouteMap based on heartbeat sample (#6590)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 05ffd8893b [IOTDB-3711] Update RegionRouteMap based on heartbeat sample (#6590)
05ffd8893b is described below
commit 05ffd8893bc4683e7bd5ed928f3e6d362ac3d697
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Wed Jul 6 22:22:32 2022 +0800
[IOTDB-3711] Update RegionRouteMap based on heartbeat sample (#6590)
---
.../confignode/client/AsyncDataNodeClientPool.java | 19 +++++
.../handlers/ConfigNodeHeartbeatHandler.java | 6 +-
.../client/handlers/DataNodeHeartbeatHandler.java | 24 ++++--
.../handlers/UpdateRegionRouteMapHandler.java | 58 +++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 2 +-
.../iotdb/confignode/manager/NodeManager.java | 6 +-
.../iotdb/confignode/manager/load/LoadManager.java | 98 ++++++++++++++++++----
.../load/heartbeat/ConfigNodeHeartbeatCache.java | 40 ++++++---
.../load/heartbeat/DataNodeHeartbeatCache.java | 32 ++++---
.../{IHeartbeatStatistic.java => INodeCache.java} | 16 ++--
.../manager/load/heartbeat/IRegionGroupCache.java | 16 ++--
...rtbeatPackage.java => NodeHeartbeatSample.java} | 6 +-
.../manager/load/heartbeat/RegionGroupCache.java | 59 +++++++++++--
...beatPackage.java => RegionHeartbeatSample.java} | 21 ++++-
14 files changed, 328 insertions(+), 75 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index c8c0e052f8..4f620f35ef 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -31,12 +31,14 @@ import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.handlers.FlushHandler;
import org.apache.iotdb.confignode.client.handlers.FunctionManagementHandler;
import org.apache.iotdb.confignode.client.handlers.SetTTLHandler;
+import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -309,6 +311,23 @@ public class AsyncDataNodeClientPool {
}
}
+ /**
+ * Update the RegionRouteMap cache on specific DataNode
+ *
+ * @param endPoint The specificDataNode
+ */
+ public void updateRegionRouteMap(
+ TEndPoint endPoint, TRegionRouteReq regionRouteReq, UpdateRegionRouteMapHandler handler) {
+ // TODO: Add a retry logic
+ try {
+ clientManager.borrowClient(endPoint).updateRegionCache(regionRouteReq, handler);
+ } catch (IOException e) {
+ LOGGER.error("Can't connect to DataNode {}", endPoint, e);
+ } catch (TException e) {
+ LOGGER.error("Update RegionRouteMap on DataNode {} failed", endPoint, e);
+ }
+ }
+
// TODO: Is the ClientPool must be a singleton?
private static class ClientPoolHolder {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
index 4a3b6b94f7..d177a13cad 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/ConfigNodeHeartbeatHandler.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.confignode.client.handlers;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
+import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
@@ -42,8 +42,8 @@ public class ConfigNodeHeartbeatHandler implements AsyncMethodCallback<Long> {
@Override
public void onComplete(Long timestamp) {
- configNodeHeartbeatCache.cacheHeartBeat(
- new HeartbeatPackage(timestamp, System.currentTimeMillis()));
+ configNodeHeartbeatCache.cacheHeartbeatSample(
+ new NodeHeartbeatSample(timestamp, System.currentTimeMillis()));
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
index 4dcb53cb3b..d6c02d3712 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/DataNodeHeartbeatHandler.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.confignode.client.handlers;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatPackage;
import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -52,21 +53,26 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR
@Override
public void onComplete(THeartbeatResp heartbeatResp) {
- dataNodeHeartbeatCache.cacheHeartBeat(
- new HeartbeatPackage(heartbeatResp.getHeartbeatTimestamp(), System.currentTimeMillis()));
+ long receiveTime = System.currentTimeMillis();
+ // Update NodeCache
+ dataNodeHeartbeatCache.cacheHeartbeatSample(
+ new NodeHeartbeatSample(heartbeatResp.getHeartbeatTimestamp(), receiveTime));
+
+ // Update RegionCache
if (heartbeatResp.isSetJudgedLeaders()) {
heartbeatResp
.getJudgedLeaders()
.forEach(
- (consensusGroupId, isLeader) -> {
- if (isLeader) {
+ (consensusGroupId, isLeader) ->
regionGroupCacheMap
.computeIfAbsent(consensusGroupId, empty -> new RegionGroupCache())
- .updateLeader(
- heartbeatResp.getHeartbeatTimestamp(), dataNodeLocation.getDataNodeId());
- }
- });
+ .cacheHeartbeatSample(
+ new RegionHeartbeatSample(
+ heartbeatResp.getHeartbeatTimestamp(),
+ receiveTime,
+ dataNodeLocation.getDataNodeId(),
+ isLeader)));
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
new file mode 100644
index 0000000000..aa97397943
--- /dev/null
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/handlers/UpdateRegionRouteMapHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.client.handlers;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+
+public class UpdateRegionRouteMapHandler implements AsyncMethodCallback<TSStatus> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpdateRegionRouteMapHandler.class);
+
+ private final TDataNodeLocation dataNodeLocation;
+ private final CountDownLatch latch;
+
+ public UpdateRegionRouteMapHandler(TDataNodeLocation dataNodeLocation, CountDownLatch latch) {
+ this.dataNodeLocation = dataNodeLocation;
+ this.latch = latch;
+ }
+
+ @Override
+ public void onComplete(TSStatus status) {
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", dataNodeLocation);
+ } else {
+ LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
+ }
+ latch.countDown();
+ }
+
+ @Override
+ public void onError(Exception e) {
+ LOGGER.error("Update RegionRouteMap on DataNode: {} failed", dataNodeLocation);
+ latch.countDown();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 6b8588d924..4758921fc4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -220,7 +220,7 @@ public class ConfigManager implements IManager {
Comparator.comparingInt(dataNodeInfoLocation -> dataNodeInfoLocation.getDataNodeId()));
Map<Integer, String> nodeStatus = new HashMap<>();
getLoadManager()
- .getHeartbeatCacheMap()
+ .getNodeCacheMap()
.forEach(
(nodeId, heartbeatCache) ->
nodeStatus.put(nodeId, heartbeatCache.getNodeStatus().getStatus()));
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index 01cea856f1..bac86cbe09 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -187,11 +187,7 @@ public class NodeManager {
int dataNodeId = dataNodeInfo.getLocation().getDataNodeId();
tDataNodesLocation.setDataNodeId(dataNodeId);
tDataNodesLocation.setStatus(
- getLoadManager()
- .getHeartbeatCacheMap()
- .get(dataNodeId)
- .getNodeStatus()
- .getStatus());
+ getLoadManager().getNodeCacheMap().get(dataNodeId).getNodeStatus().getStatus());
tDataNodesLocation.setRpcAddresss(
dataNodeInfo.getLocation().getClientRpcEndPoint().getIp());
tDataNodesLocation.setRpcPort(
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b5ea29f5b0..348744ff8c 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
@@ -34,6 +35,8 @@ import org.apache.iotdb.confignode.client.AsyncConfigNodeClientPool;
import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.handlers.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.handlers.DataNodeHeartbeatHandler;
+import org.apache.iotdb.confignode.client.handlers.UpdateRegionRouteMapHandler;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionGroupsPlan;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
@@ -48,9 +51,10 @@ import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
-import org.apache.iotdb.confignode.manager.load.heartbeat.IHeartbeatStatistic;
+import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,9 +63,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -73,14 +79,16 @@ public class LoadManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
- private final IManager configManager;
+ private static final ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
+ private static final long heartbeatInterval = conf.getHeartbeatInterval();
+ public static final TEndPoint currentNode =
+ new TEndPoint(conf.getInternalAddress(), conf.getInternalPort());
- private final long heartbeatInterval =
- ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
+ private final IManager configManager;
/** Heartbeat sample cache */
// Map<NodeId, IHeartbeatStatistic>
- private final Map<Integer, IHeartbeatStatistic> heartbeatCacheMap;
+ private final Map<Integer, INodeCache> nodeCacheMap;
// Map<RegionId, RegionGroupCache>
private final Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap;
@@ -108,7 +116,7 @@ public class LoadManager {
public LoadManager(IManager configManager) {
this.configManager = configManager;
- this.heartbeatCacheMap = new ConcurrentHashMap<>();
+ this.nodeCacheMap = new ConcurrentHashMap<>();
this.regionGroupCacheMap = new ConcurrentHashMap<>();
this.regionBalancer = new RegionBalancer(configManager);
@@ -183,7 +191,7 @@ public class LoadManager {
public Map<Integer, Float> getAllLoadScores() {
Map<Integer, Float> result = new ConcurrentHashMap<>();
- heartbeatCacheMap.forEach(
+ nodeCacheMap.forEach(
(dataNodeId, heartbeatCache) -> result.put(dataNodeId, heartbeatCache.getLoadScore()));
return result;
@@ -256,7 +264,57 @@ public class LoadManager {
}
private void updateNodeLoadStatistic() {
- heartbeatCacheMap.values().forEach(IHeartbeatStatistic::updateLoadStatistic);
+ AtomicBoolean isNeedBroadcast = new AtomicBoolean(false);
+
+ nodeCacheMap
+ .values()
+ .forEach(
+ nodeCache -> {
+ boolean updateResult = nodeCache.updateLoadStatistic();
+ if (conf.getRoutingPolicy().equals(RouteBalancer.greedyPolicy)
+ && nodeCache instanceof DataNodeHeartbeatCache) {
+ // We need a broadcast when some DataNode fail down
+ isNeedBroadcast.compareAndSet(false, updateResult);
+ }
+ });
+
+ regionGroupCacheMap
+ .values()
+ .forEach(
+ regionGroupCache -> {
+ boolean updateResult = regionGroupCache.updateLoadStatistic();
+ if (conf.getRoutingPolicy().equals(RouteBalancer.leaderPolicy)) {
+ // We need a broadcast when the leadership changed
+ isNeedBroadcast.compareAndSet(false, updateResult);
+ }
+ });
+
+ if (isNeedBroadcast.get()) {
+ broadcastLatestRegionRouteMap();
+ }
+ }
+
+ private void broadcastLatestRegionRouteMap() {
+ Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = genRealTimeRoutingPolicy();
+ List<TDataNodeInfo> onlineDataNodes = getOnlineDataNodes(-1);
+ CountDownLatch latch = new CountDownLatch(onlineDataNodes.size());
+
+ LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
+
+ onlineDataNodes.forEach(
+ dataNodeInfo ->
+ AsyncDataNodeClientPool.getInstance()
+ .updateRegionRouteMap(
+ dataNodeInfo.getLocation().getInternalEndPoint(),
+ new TRegionRouteReq(System.currentTimeMillis(), latestRegionRouteMap),
+ new UpdateRegionRouteMapHandler(dataNodeInfo.getLocation(), latch)));
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ LOGGER.warn("Broadcast the latest RegionRouteMap was interrupted!");
+ }
+ LOGGER.info("Broadcast the latest RegionRouteMap finished.");
}
private THeartbeatReq genHeartbeatReq() {
@@ -285,7 +343,7 @@ public class LoadManager {
new DataNodeHeartbeatHandler(
dataNodeInfo.getLocation(),
(DataNodeHeartbeatCache)
- heartbeatCacheMap.computeIfAbsent(
+ nodeCacheMap.computeIfAbsent(
dataNodeInfo.getLocation().getDataNodeId(),
empty -> new DataNodeHeartbeatCache()),
regionGroupCacheMap);
@@ -301,15 +359,23 @@ public class LoadManager {
* @param registeredConfigNodes ConfigNodes that registered in cluster
*/
private void pingRegisteredConfigNodes(List<TConfigNodeLocation> registeredConfigNodes) {
+
// Send heartbeat requests
for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
+ if (configNodeLocation.getInternalEndPoint().equals(currentNode)) {
+ // Skip itself
+ nodeCacheMap.putIfAbsent(
+ configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
+ continue;
+ }
+
ConfigNodeHeartbeatHandler handler =
new ConfigNodeHeartbeatHandler(
configNodeLocation,
(ConfigNodeHeartbeatCache)
- heartbeatCacheMap.computeIfAbsent(
+ nodeCacheMap.computeIfAbsent(
configNodeLocation.getConfigNodeId(),
- empty -> new ConfigNodeHeartbeatCache()));
+ empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
AsyncConfigNodeClientPool.getInstance()
.getConfigNodeHeartBeat(
configNodeLocation.getInternalEndPoint(),
@@ -324,14 +390,14 @@ public class LoadManager {
* @param nodeId removed node id
*/
public void removeNodeHeartbeatHandCache(Integer nodeId) {
- heartbeatCacheMap.remove(nodeId);
+ nodeCacheMap.remove(nodeId);
}
public List<TConfigNodeLocation> getOnlineConfigNodes() {
return getNodeManager().getRegisteredConfigNodes().stream()
.filter(
registeredConfigNode ->
- heartbeatCacheMap
+ nodeCacheMap
.get(registeredConfigNode.getConfigNodeId())
.getNodeStatus()
.equals(NodeStatus.Running))
@@ -342,7 +408,7 @@ public class LoadManager {
return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
.filter(
registeredDataNode ->
- heartbeatCacheMap
+ nodeCacheMap
.get(registeredDataNode.getLocation().getDataNodeId())
.getNodeStatus()
.equals(NodeStatus.Running))
@@ -365,7 +431,7 @@ public class LoadManager {
return configManager.getPartitionManager();
}
- public Map<Integer, IHeartbeatStatistic> getHeartbeatCacheMap() {
- return heartbeatCacheMap;
+ public Map<Integer, INodeCache> getNodeCacheMap() {
+ return nodeCacheMap;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
index 039dda5583..597a9f944b 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/ConfigNodeHeartbeatCache.java
@@ -18,43 +18,52 @@
*/
package org.apache.iotdb.confignode.manager.load.heartbeat;
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import java.util.LinkedList;
-public class ConfigNodeHeartbeatCache implements IHeartbeatStatistic {
+public class ConfigNodeHeartbeatCache implements INodeCache {
// Cache heartbeat samples
private static final int maximumWindowSize = 100;
- private final LinkedList<HeartbeatPackage> slidingWindow;
+ private final LinkedList<NodeHeartbeatSample> slidingWindow;
+
+ private final TConfigNodeLocation configNodeLocation;
// For showing cluster
private volatile NodeStatus status;
- public ConfigNodeHeartbeatCache() {
+ public ConfigNodeHeartbeatCache(TConfigNodeLocation configNodeLocation) {
+ this.configNodeLocation = configNodeLocation;
this.slidingWindow = new LinkedList<>();
-
this.status = NodeStatus.Running;
}
@Override
- public void cacheHeartBeat(HeartbeatPackage newHeartbeat) {
+ public void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample) {
synchronized (slidingWindow) {
// Only sequential heartbeats are accepted.
// And un-sequential heartbeats will be discarded.
if (slidingWindow.size() == 0
- || slidingWindow.getLast().getSendTimestamp() < newHeartbeat.getSendTimestamp()) {
- slidingWindow.add(newHeartbeat);
+ || slidingWindow.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+ slidingWindow.add(newHeartbeatSample);
}
- while (slidingWindow.size() > maximumWindowSize) {
+ if (slidingWindow.size() > maximumWindowSize) {
slidingWindow.removeFirst();
}
}
}
@Override
- public void updateLoadStatistic() {
+ public boolean updateLoadStatistic() {
+ if (configNodeLocation.getInternalEndPoint().equals(LoadManager.currentNode)) {
+ // We don't need to update itself
+ return false;
+ }
+
long lastSendTime = 0;
synchronized (slidingWindow) {
if (slidingWindow.size() > 0) {
@@ -62,12 +71,23 @@ public class ConfigNodeHeartbeatCache implements IHeartbeatStatistic {
}
}
- // TODO: Optimize
+ NodeStatus originStatus;
+ switch (status) {
+ case Running:
+ originStatus = NodeStatus.Running;
+ break;
+ case Unknown:
+ default:
+ originStatus = NodeStatus.Unknown;
+ }
+
+ // TODO: Optimize judge logic
if (System.currentTimeMillis() - lastSendTime > 20_000) {
status = NodeStatus.Unknown;
} else {
status = NodeStatus.Running;
}
+ return !status.getStatus().equals(originStatus.getStatus());
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
index e02023fd24..a836a6d2e2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/DataNodeHeartbeatCache.java
@@ -23,13 +23,13 @@ import org.apache.iotdb.commons.cluster.NodeStatus;
import java.util.LinkedList;
/** DataNodeHeartbeatCache caches and maintains all the heartbeat data */
-public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
+public class DataNodeHeartbeatCache implements INodeCache {
// TODO: This class might be split into DataNodeCache and ConfigNodeCache
// Cache heartbeat samples
private static final int maximumWindowSize = 100;
- private final LinkedList<HeartbeatPackage> slidingWindow;
+ private final LinkedList<NodeHeartbeatSample> slidingWindow;
// For guiding queries, the higher the score the higher the load
private volatile float loadScore;
@@ -44,23 +44,23 @@ public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
}
@Override
- public void cacheHeartBeat(HeartbeatPackage newHeartbeat) {
+ public void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample) {
synchronized (slidingWindow) {
- // Only sequential heartbeats are accepted.
- // And un-sequential heartbeats will be discarded.
+ // Only sequential HeartbeatSamples are accepted.
+ // And un-sequential HeartbeatSamples will be discarded.
if (slidingWindow.size() == 0
- || slidingWindow.getLast().getSendTimestamp() < newHeartbeat.getSendTimestamp()) {
- slidingWindow.add(newHeartbeat);
+ || slidingWindow.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+ slidingWindow.add(newHeartbeatSample);
}
- while (slidingWindow.size() > maximumWindowSize) {
+ if (slidingWindow.size() > maximumWindowSize) {
slidingWindow.removeFirst();
}
}
}
@Override
- public void updateLoadStatistic() {
+ public boolean updateLoadStatistic() {
long lastSendTime = 0;
synchronized (slidingWindow) {
if (slidingWindow.size() > 0) {
@@ -68,13 +68,23 @@ public class DataNodeHeartbeatCache implements IHeartbeatStatistic {
}
}
- // TODO: Optimize
- loadScore = -lastSendTime;
+ NodeStatus originStatus;
+ switch (status) {
+ case Running:
+ originStatus = NodeStatus.Running;
+ break;
+ case Unknown:
+ default:
+ originStatus = NodeStatus.Unknown;
+ }
+
+ // TODO: Optimize judge logic
if (System.currentTimeMillis() - lastSendTime > 20_000) {
status = NodeStatus.Unknown;
} else {
status = NodeStatus.Running;
}
+ return !status.getStatus().equals(originStatus.getStatus());
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
similarity index 77%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
index 413f54792a..68de3916e0 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IHeartbeatStatistic.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/INodeCache.java
@@ -21,17 +21,21 @@ package org.apache.iotdb.confignode.manager.load.heartbeat;
import org.apache.iotdb.commons.cluster.NodeStatus;
/** All the statistic interfaces that provided by HeartbeatCache */
-public interface IHeartbeatStatistic {
+public interface INodeCache {
/**
- * Cache the newest HeartbeatPackage
+ * Cache the newest HeartbeatSample
*
- * @param newHeartbeat The newest HeartbeatData
+ * @param newHeartbeatSample The newest HeartbeatSample
*/
- void cacheHeartBeat(HeartbeatPackage newHeartbeat);
+ void cacheHeartbeatSample(NodeHeartbeatSample newHeartbeatSample);
- /** Invoking periodically to update node load statistics */
- void updateLoadStatistic();
+ /**
+ * Invoking periodically to update Nodes' load statistics
+ *
+ * @return true if some load statistic changed
+ */
+ boolean updateLoadStatistic();
/** @return The latest load score of a node, the higher the score the higher the load */
float getLoadScore();
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
index 7283665dc5..3c13b7e855 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
@@ -21,17 +21,23 @@ package org.apache.iotdb.confignode.manager.load.heartbeat;
public interface IRegionGroupCache {
/**
- * Update RegionGroup's latest leader
+ * Cache the newest HeartbeatSample
*
- * @param timestamp Judging timestamp
- * @param dataNodeId Leader location
+ * @param newHeartbeatSample The newest HeartbeatSample
*/
- void updateLeader(long timestamp, int dataNodeId);
+ void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample);
+
+ /**
+ * Invoking periodically to update RegionGroups' load statistics
+ *
+ * @return true if some load statistic changed
+ */
+ boolean updateLoadStatistic();
/**
* Get RegionGroup's latest leader
*
- * @return The DataNodeId of latest leader
+ * @return The DataNodeId of the latest leader
*/
int getLeaderDataNodeId();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/NodeHeartbeatSample.java
similarity index 89%
copy from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
copy to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/NodeHeartbeatSample.java
index 66327611aa..509642c2fc 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/NodeHeartbeatSample.java
@@ -18,13 +18,15 @@
*/
package org.apache.iotdb.confignode.manager.load.heartbeat;
-public class HeartbeatPackage {
+public class NodeHeartbeatSample {
// Unit: ms
private final long sendTimestamp;
private final long receiveTimestamp;
- public HeartbeatPackage(long sendTimestamp, long receiveTimestamp) {
+ // TODO: Add load sample
+
+ public NodeHeartbeatSample(long sendTimestamp, long receiveTimestamp) {
this.sendTimestamp = sendTimestamp;
this.receiveTimestamp = receiveTimestamp;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index 6ec938ccbc..ce799864a6 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -18,26 +18,75 @@
*/
package org.apache.iotdb.confignode.manager.load.heartbeat;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class RegionGroupCache implements IRegionGroupCache {
// TODO: This class might be split into SchemaRegionGroupCache and DataRegionGroupCache
- private long timestamp;
+ private static final int maximumWindowSize = 100;
+ // Map<DataNodeId(where a RegionReplica resides), LinkedList<RegionHeartbeatSample>>
+ private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
+ // Indicates the version of the statistics
+ private final AtomicLong versionTimestamp;
+ // The DataNode where the leader resides
private final AtomicInteger leaderDataNodeId;
public RegionGroupCache() {
+ this.slidingWindow = new ConcurrentHashMap<>();
+
+ this.versionTimestamp = new AtomicLong(0);
this.leaderDataNodeId = new AtomicInteger(-1);
}
@Override
- public synchronized void updateLeader(long timestamp, int dataNodeId) {
- if (timestamp > this.timestamp) {
- this.timestamp = timestamp;
- this.leaderDataNodeId.set(dataNodeId);
+ public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample) {
+ slidingWindow.putIfAbsent(newHeartbeatSample.getBelongedDataNodeId(), new LinkedList<>());
+ synchronized (slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId())) {
+ LinkedList<RegionHeartbeatSample> samples =
+ slidingWindow.get(newHeartbeatSample.getBelongedDataNodeId());
+
+ // Only sequential HeartbeatSamples are accepted.
+ // And un-sequential HeartbeatSamples will be discarded.
+ if (samples.size() == 0
+ || samples.getLast().getSendTimestamp() < newHeartbeatSample.getSendTimestamp()) {
+ samples.add(newHeartbeatSample);
+ }
+
+ if (samples.size() > maximumWindowSize) {
+ samples.removeFirst();
+ }
+ }
+ }
+
+ @Override
+ public boolean updateLoadStatistic() {
+ long updateVersion = Long.MIN_VALUE;
+ int updateLeaderDataNodeId = -1;
+
+ synchronized (slidingWindow) {
+ for (LinkedList<RegionHeartbeatSample> samples : slidingWindow.values()) {
+ if (samples.size() > 0) {
+ RegionHeartbeatSample lastSample = samples.getLast();
+ if (lastSample.getSendTimestamp() > updateVersion && lastSample.isLeader()) {
+ updateVersion = lastSample.getSendTimestamp();
+ updateLeaderDataNodeId = lastSample.getBelongedDataNodeId();
+ }
+ }
+ }
+ }
+
+ if (updateVersion > versionTimestamp.get()) {
+ versionTimestamp.set(updateVersion);
+ leaderDataNodeId.set(updateLeaderDataNodeId);
+ return true;
}
+ return false;
}
@Override
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
similarity index 71%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
index 66327611aa..8631c2edbf 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/HeartbeatPackage.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionHeartbeatSample.java
@@ -18,15 +18,24 @@
*/
package org.apache.iotdb.confignode.manager.load.heartbeat;
-public class HeartbeatPackage {
+public class RegionHeartbeatSample {
// Unit: ms
private final long sendTimestamp;
private final long receiveTimestamp;
- public HeartbeatPackage(long sendTimestamp, long receiveTimestamp) {
+ private final int belongedDataNodeId;
+ private final boolean isLeader;
+
+ // TODO: Add load sample
+
+ public RegionHeartbeatSample(
+ long sendTimestamp, long receiveTimestamp, int belongedDataNodeId, boolean isLeader) {
this.sendTimestamp = sendTimestamp;
this.receiveTimestamp = receiveTimestamp;
+
+ this.belongedDataNodeId = belongedDataNodeId;
+ this.isLeader = isLeader;
}
public long getSendTimestamp() {
@@ -36,4 +45,12 @@ public class HeartbeatPackage {
public long getReceiveTimestamp() {
return receiveTimestamp;
}
+
+ public int getBelongedDataNodeId() {
+ return belongedDataNodeId;
+ }
+
+ public boolean isLeader() {
+ return isLeader;
+ }
}