You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/16 15:26:12 UTC
[iotdb] branch master updated: [IOTDB-3181] Region expansion based on the total number of cpu cores (#5919)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d91044dda [IOTDB-3181] Region expansion based on the total number of cpu cores (#5919)
7d91044dda is described below
commit 7d91044ddac135287521d698de3f19b31794789e
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Mon May 16 23:26:06 2022 +0800
[IOTDB-3181] Region expansion based on the total number of cpu cores (#5919)
---
.../resources/conf/iotdb-confignode.properties | 4 +-
.../confignode/client/AsyncDataNodeClientPool.java | 128 +++++++++-
.../iotdb/confignode/conf/ConfigNodeConf.java | 4 +-
.../consensus/request/write/CreateRegionsReq.java | 32 ++-
.../request/write/RegisterDataNodeReq.java | 22 +-
...deLocationsResp.java => DataNodeInfosResp.java} | 18 +-
.../iotdb/confignode/manager/ConfigManager.java | 4 +-
.../iotdb/confignode/manager/NodeManager.java | 40 ++-
.../iotdb/confignode/manager/PartitionManager.java | 7 +-
.../iotdb/confignode/manager/load/LoadManager.java | 267 ++++++++-------------
.../manager/load/balancer/RegionBalancer.java | 94 +++++++-
.../allocator/CopySetRegionAllocator.java | 25 +-
.../{ => balancer}/allocator/IRegionAllocator.java | 6 +-
.../confignode/persistence/ClusterSchemaInfo.java | 27 ++-
.../iotdb/confignode/persistence/NodeInfo.java | 79 +++---
.../confignode/persistence/PartitionInfo.java | 10 +-
.../impl/DeleteStorageGroupProcedure.java | 13 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 13 +-
.../consensus/request/ConfigRequestSerDeTest.java | 9 +-
.../iotdb/confignode/persistence/NodeInfoTest.java | 12 +-
.../thrift/ConfigNodeRPCServiceProcessorTest.java | 37 +--
.../commons/utils/ThriftCommonsSerDeUtils.java | 19 ++
.../apache/iotdb/db/client/ConfigNodeClient.java | 7 +-
.../java/org/apache/iotdb/db/service/DataNode.java | 9 +-
thrift-commons/src/main/thrift/common.thrift | 5 +-
.../src/main/thrift/confignode.thrift | 11 +-
26 files changed, 552 insertions(+), 350 deletions(-)
diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index 2ca585b2d1..86ec5c1cf8 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -237,8 +237,8 @@ target_confignode=0.0.0.0:22277
# The heartbeat interval in milliseconds, default is 3000ms
# Datatype: long
-# heartbeat_interval=3000
+# heartbeat_interval=1000
# This parameter only exists for a few days
-# enable_heartbeat=false
\ No newline at end of file
+# enable_heartbeat=true
\ No newline at end of file
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
index 3e156986ae..96d2fe5f1e 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/AsyncDataNodeClientPool.java
@@ -18,12 +18,16 @@
*/
package org.apache.iotdb.confignode.client;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
@@ -32,6 +36,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
public class AsyncDataNodeClientPool {
@@ -47,12 +56,120 @@ public class AsyncDataNodeClientPool {
new ConfigNodeClientPoolFactory.AsyncDataNodeInternalServiceClientPoolFactory());
}
+ /**
+ * Execute CreateRegionsReq asynchronously
+ *
+ * @param createRegionsReq CreateRegionsReq
+ * @param ttlMap Map<StorageGroupName, TTL>
+ */
+ public void createRegions(CreateRegionsReq createRegionsReq, Map<String, Long> ttlMap) {
+
+ // Index of each Region
+ int index = 0;
+ // Number of regions to be created
+ int regionNum = 0;
+ // Map<TConsensusGroupId, Map<DataNodeId, index>>
+ Map<TConsensusGroupId, Map<Integer, Integer>> indexMap = new TreeMap<>();
+
+ // Assign an independent index to each Region
+ for (Map.Entry<String, List<TRegionReplicaSet>> entry :
+ createRegionsReq.getRegionMap().entrySet()) {
+ for (TRegionReplicaSet regionReplicaSet : entry.getValue()) {
+ regionNum += regionReplicaSet.getDataNodeLocationsSize();
+ for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
+ indexMap
+ .computeIfAbsent(regionReplicaSet.getRegionId(), idMap -> new TreeMap<>())
+ .put(dataNodeLocation.getDataNodeId(), index);
+ index += 1;
+ }
+ }
+ }
+
+ BitSet bitSet = new BitSet(regionNum);
+ for (int retry = 0; retry < 3; retry++) {
+ CountDownLatch latch = new CountDownLatch(regionNum - bitSet.cardinality());
+ createRegionsReq
+ .getRegionMap()
+ .forEach(
+ (storageGroup, regionReplicaSets) -> {
+ // Enumerate each RegionReplicaSet
+ regionReplicaSets.forEach(
+ regionReplicaSet -> {
+ // Enumerate each Region
+ regionReplicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+ // Skip those created successfully
+ if (!bitSet.get(
+ indexMap
+ .get(regionReplicaSet.getRegionId())
+ .get(dataNodeLocation.getDataNodeId()))) {
+ TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
+ CreateRegionHandler handler =
+ new CreateRegionHandler(
+ indexMap
+ .get(regionReplicaSet.getRegionId())
+ .get(dataNodeLocation.getDataNodeId()),
+ bitSet,
+ latch,
+ regionReplicaSet.getRegionId(),
+ dataNodeLocation);
+
+ switch (regionReplicaSet.getRegionId().getType()) {
+ case SchemaRegion:
+ createSchemaRegion(
+ endPoint,
+ genCreateSchemaRegionReq(storageGroup, regionReplicaSet),
+ handler);
+ break;
+ case DataRegion:
+ createDataRegion(
+ endPoint,
+ genCreateDataRegionReq(
+ storageGroup,
+ regionReplicaSet,
+ ttlMap.get(storageGroup)),
+ handler);
+ }
+ }
+ });
+ });
+ });
+
+ try {
+ // Waiting until this batch of create requests done
+ latch.await();
+ } catch (InterruptedException e) {
+ LOGGER.error("ClusterSchemaManager was interrupted during create Regions on DataNodes", e);
+ }
+
+ if (bitSet.cardinality() == regionNum) {
+ // Break if all creations success
+ break;
+ }
+ }
+
+ if (bitSet.cardinality() < regionNum) {
+ LOGGER.error(
+ "Failed to create some SchemaRegions or DataRegions on DataNodes. Please check former logs.");
+ }
+ }
+
+ private TCreateSchemaRegionReq genCreateSchemaRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet) {
+ TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ return req;
+ }
+
/**
* Create a SchemaRegion on specific DataNode
*
* @param endPoint The specific DataNode
*/
- public void createSchemaRegion(
+ private void createSchemaRegion(
TEndPoint endPoint, TCreateSchemaRegionReq req, CreateRegionHandler handler) {
AsyncDataNodeInternalServiceClient client;
try {
@@ -65,6 +182,15 @@ public class AsyncDataNodeClientPool {
}
}
+ private TCreateDataRegionReq genCreateDataRegionReq(
+ String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
+ TCreateDataRegionReq req = new TCreateDataRegionReq();
+ req.setStorageGroup(storageGroup);
+ req.setRegionReplicaSet(regionReplicaSet);
+ req.setTtl(TTL);
+ return req;
+ }
+
/**
* Create a DataRegion on specific DataNode
*
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
index 923fe5c700..48e830d3e5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConf.java
@@ -129,10 +129,10 @@ public class ConfigNodeConf {
Math.max(Runtime.getRuntime().availableProcessors() / 4, 16);
/** The heartbeat interval in milliseconds */
- private long heartbeatInterval = 3000;
+ private long heartbeatInterval = 1000;
/** This parameter only exists for a few days */
- private boolean enableHeartbeat = false;
+ private boolean enableHeartbeat = true;
ConfigNodeConf() {
// empty constructor
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
index cc8b142721..5b9a900f31 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/CreateRegionsReq.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
@@ -33,19 +35,21 @@ import java.util.TreeMap;
/** Create regions for specific StorageGroups */
public class CreateRegionsReq extends ConfigRequest {
- private final Map<String, TRegionReplicaSet> regionMap;
+ private final Map<String, List<TRegionReplicaSet>> regionMap;
public CreateRegionsReq() {
super(ConfigRequestType.CreateRegions);
this.regionMap = new TreeMap<>();
}
- public Map<String, TRegionReplicaSet> getRegionMap() {
+ public Map<String, List<TRegionReplicaSet>> getRegionMap() {
return regionMap;
}
public void addRegion(String storageGroup, TRegionReplicaSet regionReplicaSet) {
- regionMap.put(storageGroup, regionReplicaSet);
+ regionMap
+ .computeIfAbsent(storageGroup, regionReplicaSets -> new ArrayList<>())
+ .add(regionReplicaSet);
}
@Override
@@ -54,20 +58,28 @@ public class CreateRegionsReq extends ConfigRequest {
buffer.putInt(regionMap.size());
regionMap.forEach(
- (storageGroup, regionReplicaSet) -> {
+ (storageGroup, regionReplicaSets) -> {
BasicStructureSerDeUtil.write(storageGroup, buffer);
- ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, buffer);
+ buffer.putInt(regionReplicaSets.size());
+ regionReplicaSets.forEach(
+ regionReplicaSet ->
+ ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, buffer));
});
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
- int length = buffer.getInt();
- for (int i = 0; i < length; i++) {
+ int storageGroupNum = buffer.getInt();
+ for (int i = 0; i < storageGroupNum; i++) {
String storageGroup = BasicStructureSerDeUtil.readString(buffer);
- TRegionReplicaSet regionReplicaSet =
- ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer);
- regionMap.put(storageGroup, regionReplicaSet);
+ regionMap.put(storageGroup, new ArrayList<>());
+
+ int regionReplicaSetNum = buffer.getInt();
+ for (int j = 0; j < regionReplicaSetNum; j++) {
+ TRegionReplicaSet regionReplicaSet =
+ ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer);
+ regionMap.get(storageGroup).add(regionReplicaSet);
+ }
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RegisterDataNodeReq.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RegisterDataNodeReq.java
index b03fef96a4..8800891207 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RegisterDataNodeReq.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/RegisterDataNodeReq.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.consensus.request.write;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.consensus.request.ConfigRequest;
import org.apache.iotdb.confignode.consensus.request.ConfigRequestType;
@@ -28,42 +28,42 @@ import java.util.Objects;
public class RegisterDataNodeReq extends ConfigRequest {
- private TDataNodeLocation location;
+ private TDataNodeInfo info;
public RegisterDataNodeReq() {
super(ConfigRequestType.RegisterDataNode);
}
- public RegisterDataNodeReq(TDataNodeLocation location) {
+ public RegisterDataNodeReq(TDataNodeInfo info) {
this();
- this.location = location;
+ this.info = info;
}
- public TDataNodeLocation getLocation() {
- return location;
+ public TDataNodeInfo getInfo() {
+ return info;
}
@Override
protected void serializeImpl(ByteBuffer buffer) {
buffer.putInt(ConfigRequestType.RegisterDataNode.ordinal());
- ThriftCommonsSerDeUtils.serializeTDataNodeLocation(location, buffer);
+ ThriftCommonsSerDeUtils.serializeTDataNodeInfo(info, buffer);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) {
- location = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer);
+ info = ThriftCommonsSerDeUtils.deserializeTDataNodeInfo(buffer);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
- RegisterDataNodeReq plan = (RegisterDataNodeReq) o;
- return location.equals(plan.location);
+ RegisterDataNodeReq that = (RegisterDataNodeReq) o;
+ return info.equals(that.info);
}
@Override
public int hashCode() {
- return Objects.hash(location);
+ return Objects.hash(info);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsResp.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeInfosResp.java
similarity index 70%
rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsResp.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeInfosResp.java
index 0331e40371..7718d47131 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeLocationsResp.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/DataNodeInfosResp.java
@@ -18,20 +18,20 @@
*/
package org.apache.iotdb.confignode.consensus.response;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeLocationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
import java.util.Map;
-public class DataNodeLocationsResp implements DataSet {
+public class DataNodeInfosResp implements DataSet {
private TSStatus status;
- private Map<Integer, TDataNodeLocation> dataNodeLocationMap;
+ private Map<Integer, TDataNodeInfo> dataNodeInfoMap;
- public DataNodeLocationsResp() {
+ public DataNodeInfosResp() {
// empty constructor
}
@@ -43,14 +43,14 @@ public class DataNodeLocationsResp implements DataSet {
return status;
}
- public void setDataNodeLocations(Map<Integer, TDataNodeLocation> dataNodeLocationMap) {
- this.dataNodeLocationMap = dataNodeLocationMap;
+ public void setDataNodeInfoMap(Map<Integer, TDataNodeInfo> dataNodeInfoMap) {
+ this.dataNodeInfoMap = dataNodeInfoMap;
}
- public void convertToRpcDataNodeLocationResp(TDataNodeLocationResp resp) {
+ public void convertToRpcDataNodeLocationResp(TDataNodeInfoResp resp) {
resp.setStatus(status);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- resp.setDataNodeLocationMap(dataNodeLocationMap);
+ resp.setDataNodeInfoMap(dataNodeInfoMap);
}
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 69561b7fcd..7324a8a1cb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -43,7 +43,7 @@ import org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
import org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.consensus.response.CountStorageGroupResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
+import org.apache.iotdb.confignode.consensus.response.DataNodeInfosResp;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
@@ -158,7 +158,7 @@ public class ConfigManager implements Manager {
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return nodeManager.getDataNodeInfo(getDataNodeInfoReq);
} else {
- DataNodeLocationsResp dataSet = new DataNodeLocationsResp();
+ DataNodeInfosResp dataSet = new DataNodeInfosResp();
dataSet.setStatus(status);
return dataSet;
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
index ea56c4cf09..ba046e531a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/NodeManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -28,7 +29,7 @@ import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
+import org.apache.iotdb.confignode.consensus.response.DataNodeInfosResp;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
@@ -83,21 +84,22 @@ public class NodeManager {
public DataSet registerDataNode(RegisterDataNodeReq req) {
DataNodeConfigurationResp dataSet = new DataNodeConfigurationResp();
- if (nodeInfo.containsValue(req.getLocation())) {
+ if (nodeInfo.isOnlineDataNode(req.getInfo().getLocation())) {
// Reset client
- AsyncDataNodeClientPool.getInstance().resetClient(req.getLocation().getInternalEndPoint());
+ AsyncDataNodeClientPool.getInstance()
+ .resetClient(req.getInfo().getLocation().getInternalEndPoint());
TSStatus status = new TSStatus(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
status.setMessage("DataNode already registered.");
dataSet.setStatus(status);
} else {
// Persist DataNodeInfo
- req.getLocation().setDataNodeId(nodeInfo.generateNextDataNodeId());
+ req.getInfo().getLocation().setDataNodeId(nodeInfo.generateNextDataNodeId());
ConsensusWriteResponse resp = getConsensusManager().write(req);
dataSet.setStatus(resp.getStatus());
}
- dataSet.setDataNodeId(req.getLocation().getDataNodeId());
+ dataSet.setDataNodeId(req.getInfo().getLocation().getDataNodeId());
dataSet.setConfigNodeList(nodeInfo.getOnlineConfigNodes());
setGlobalConfig(dataSet);
return dataSet;
@@ -110,31 +112,19 @@ public class NodeManager {
* @return The specific DataNode's info or all DataNode info if dataNodeId in
* QueryDataNodeInfoPlan is -1
*/
- public DataNodeLocationsResp getDataNodeInfo(GetDataNodeInfoReq req) {
- return (DataNodeLocationsResp) getConsensusManager().read(req).getDataset();
- }
-
- public int getOnlineDataNodeCount() {
- return nodeInfo.getOnlineDataNodeCount();
- }
-
- /**
- * Only leader use this interface.
- *
- * @return all online DataNodes
- */
- public List<TDataNodeLocation> getOnlineDataNodes() {
- return nodeInfo.getOnlineDataNodes();
+ public DataNodeInfosResp getDataNodeInfo(GetDataNodeInfoReq req) {
+ return (DataNodeInfosResp) getConsensusManager().read(req).getDataset();
}
/**
- * Only leader use this interface.
+ * Only leader use this interface
*
- * @param dataNodeId the specific DataNodeId
- * @return the specific DataNodeLocation
+ * @param dataNodeId Specific DataNodeId
+ * @return All online DataNodes if dataNodeId equals -1. And return the specific DataNode
+ * otherwise.
*/
- public TDataNodeLocation getOnlineDataNode(int dataNodeId) {
- return nodeInfo.getOnlineDataNode(dataNodeId);
+ public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
+ return nodeInfo.getOnlineDataNodes(dataNodeId);
}
/**
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 5d90ab1551..6ce4d6dec9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.confignode.consensus.request.read.GetOrCreateSchemaParti
import org.apache.iotdb.confignode.consensus.request.read.GetSchemaPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateDataPartitionReq;
import org.apache.iotdb.confignode.consensus.request.write.CreateSchemaPartitionReq;
-import org.apache.iotdb.confignode.consensus.request.write.DeleteRegionsReq;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
@@ -290,7 +289,7 @@ public class PartitionManager {
storageGroupWithoutRegion.add(storageGroup);
}
}
- getLoadManager().allocateAndCreateRegions(storageGroupWithoutRegion, consensusGroupType);
+ getLoadManager().initializeRegions(storageGroupWithoutRegion, consensusGroupType);
}
/** Get all allocated RegionReplicaSets */
@@ -346,8 +345,4 @@ public class PartitionManager {
private LoadManager getLoadManager() {
return configManager.getLoadManager();
}
-
- public TSStatus deleteRegions(DeleteRegionsReq deleteRegionsReq) {
- return getConsensusManager().write(deleteRegionsReq).getStatus();
- }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index cae71a74e8..55654b1437 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -20,13 +20,11 @@ package org.apache.iotdb.confignode.manager.load;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.confignode.client.AsyncDataNodeClientPool;
-import org.apache.iotdb.confignode.client.handlers.CreateRegionHandler;
import org.apache.iotdb.confignode.client.handlers.HeartbeatHandler;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
@@ -35,22 +33,18 @@ import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.Manager;
import org.apache.iotdb.confignode.manager.NodeManager;
-import org.apache.iotdb.confignode.manager.PartitionManager;
-import org.apache.iotdb.confignode.manager.load.allocator.CopySetRegionAllocator;
-import org.apache.iotdb.confignode.manager.load.allocator.IRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer;
import org.apache.iotdb.confignode.manager.load.heartbeat.HeartbeatCache;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.BitSet;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
/**
@@ -61,37 +55,41 @@ public class LoadManager implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadManager.class);
- private final long heartbeatInterval =
- ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
-
private final Manager configManager;
+ private final long heartbeatInterval =
+ ConfigNodeDescriptor.getInstance().getConf().getHeartbeatInterval();
private final HeartbeatCache heartbeatCache;
- private final IRegionAllocator regionAllocator;
+ private final RegionBalancer regionBalancer;
+
+ private final Map<TConsensusGroupId, TRegionReplicaSet> replicaScoreMap;
// TODO: Interfaces for active, interrupt and reset LoadBalancer
public LoadManager(Manager configManager) {
this.configManager = configManager;
this.heartbeatCache = new HeartbeatCache();
- this.regionAllocator = new CopySetRegionAllocator();
+
+ this.regionBalancer = new RegionBalancer(configManager);
+
+ this.replicaScoreMap = new TreeMap<>();
}
/**
- * Allocate and create one Region on DataNode for each StorageGroup.
+ * Allocate and create one Region for each StorageGroup. TODO: Use procedure to protect create
+ * Regions process
*
* @param storageGroups List<StorageGroupName>
* @param consensusGroupType TConsensusGroupType of Region to be allocated
*/
- public void allocateAndCreateRegions(
- List<String> storageGroups, TConsensusGroupType consensusGroupType)
+ public void initializeRegions(List<String> storageGroups, TConsensusGroupType consensusGroupType)
throws NotEnoughDataNodeException {
CreateRegionsReq createRegionsReq = null;
- // TODO: use procedure to protect create Regions process
try {
- createRegionsReq = allocateRegions(storageGroups, consensusGroupType);
+ createRegionsReq =
+ regionBalancer.genRegionsAllocationPlan(storageGroups, consensusGroupType, 1);
createRegionsOnDataNodes(createRegionsReq);
} catch (MetadataException e) {
LOGGER.error("Meet error when create Regions", e);
@@ -100,179 +98,106 @@ public class LoadManager implements Runnable {
getConsensusManager().write(createRegionsReq);
}
- private CreateRegionsReq allocateRegions(
- List<String> storageGroups, TConsensusGroupType consensusGroupType)
- throws NotEnoughDataNodeException, MetadataException {
- CreateRegionsReq createRegionsReq = new CreateRegionsReq();
-
- List<TDataNodeLocation> onlineDataNodes = getNodeManager().getOnlineDataNodes();
- List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllocatedRegions();
-
- for (String storageGroup : storageGroups) {
- TStorageGroupSchema storageGroupSchema =
- getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
- int replicationFactor =
- consensusGroupType == TConsensusGroupType.SchemaRegion
- ? storageGroupSchema.getSchemaReplicationFactor()
- : storageGroupSchema.getDataReplicationFactor();
-
- if (onlineDataNodes.size() < replicationFactor) {
- throw new NotEnoughDataNodeException();
- }
-
- TRegionReplicaSet newRegion =
- regionAllocator.allocateRegion(
- onlineDataNodes,
- allocatedRegions,
- replicationFactor,
- new TConsensusGroupId(
- consensusGroupType, getPartitionManager().generateNextRegionGroupId()));
- createRegionsReq.addRegion(storageGroup, newRegion);
- }
-
- return createRegionsReq;
- }
-
private void createRegionsOnDataNodes(CreateRegionsReq createRegionsReq)
throws MetadataException {
- // Index of each Region
- int index = 0;
- // Number of regions to be created
- int regionNum = 0;
- Map<String, Map<Integer, Integer>> indexMap = new HashMap<>();
Map<String, Long> ttlMap = new HashMap<>();
- for (Map.Entry<String, TRegionReplicaSet> entry : createRegionsReq.getRegionMap().entrySet()) {
- regionNum += entry.getValue().getDataNodeLocationsSize();
+ for (String storageGroup : createRegionsReq.getRegionMap().keySet()) {
ttlMap.put(
- entry.getKey(),
- getClusterSchemaManager().getStorageGroupSchemaByName(entry.getKey()).getTTL());
- for (TDataNodeLocation dataNodeLocation : entry.getValue().getDataNodeLocations()) {
- indexMap
- .computeIfAbsent(entry.getKey(), sg -> new HashMap<>())
- .put(dataNodeLocation.getDataNodeId(), index);
- index += 1;
- }
+ storageGroup,
+ getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
}
+ AsyncDataNodeClientPool.getInstance().createRegions(createRegionsReq, ttlMap);
+ }
- BitSet bitSet = new BitSet(regionNum);
-
- for (int retry = 0; retry < 3; retry++) {
- CountDownLatch latch = new CountDownLatch(regionNum - bitSet.cardinality());
+ private THeartbeatReq genHeartbeatReq() {
+ return new THeartbeatReq(System.currentTimeMillis());
+ }
- createRegionsReq
- .getRegionMap()
- .forEach(
- (storageGroup, regionReplicaSet) -> {
- // Enumerate each Region
- regionReplicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation -> {
- // Skip those created successfully
- if (!bitSet.get(
- indexMap.get(storageGroup).get(dataNodeLocation.getDataNodeId()))) {
- TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
- CreateRegionHandler handler =
- new CreateRegionHandler(
- indexMap
- .get(storageGroup)
- .get(dataNodeLocation.getDataNodeId()),
- bitSet,
- latch,
- regionReplicaSet.getRegionId(),
- dataNodeLocation);
+ private void regionExpansion() {
+ // Currently, we simply expand the number of regions held by each storage group to
+ // 50% of the total CPU cores to facilitate performance testing of multiple regions
- switch (regionReplicaSet.getRegionId().getType()) {
- case SchemaRegion:
- AsyncDataNodeClientPool.getInstance()
- .createSchemaRegion(
- endPoint,
- genCreateSchemaRegionReq(storageGroup, regionReplicaSet),
- handler);
- break;
- case DataRegion:
- AsyncDataNodeClientPool.getInstance()
- .createDataRegion(
- endPoint,
- genCreateDataRegionReq(
- storageGroup,
- regionReplicaSet,
- ttlMap.get(storageGroup)),
- handler);
- }
- }
- });
- });
+ int totalCoreNum = 0;
+ List<TDataNodeInfo> dataNodeInfos = getNodeManager().getOnlineDataNodes(-1);
+ for (TDataNodeInfo dataNodeInfo : dataNodeInfos) {
+ totalCoreNum += dataNodeInfo.getCpuCoreNum();
+ }
+ List<String> storageGroups = getClusterSchemaManager().getStorageGroupNames();
+ for (String storageGroup : storageGroups) {
try {
- latch.await();
- } catch (InterruptedException e) {
- LOGGER.error("ClusterSchemaManager was interrupted during create Regions on DataNodes", e);
- }
+ TStorageGroupSchema storageGroupSchema =
+ getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
+ int totalReplicaNum =
+ storageGroupSchema.getSchemaReplicationFactor()
+ * storageGroupSchema.getSchemaRegionGroupIdsSize()
+ + storageGroupSchema.getDataReplicationFactor()
+ * storageGroupSchema.getDataRegionGroupIdsSize();
+
+ if (totalReplicaNum < totalCoreNum * 0.5) {
+ // Allocate more Regions
+ CreateRegionsReq createRegionsReq;
+ if (storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
+ > storageGroupSchema.getDataRegionGroupIdsSize()) {
+ // TODO: Find an optimal SchemaRegion:DataRegion rate
+ // Currently, we just assume that it's 1:5
+ int regionNum =
+ Math.min(
+ ((int) (totalCoreNum * 0.5) - totalReplicaNum)
+ / storageGroupSchema.getDataReplicationFactor(),
+ storageGroupSchema.getSchemaRegionGroupIdsSize() * 5
+ - storageGroupSchema.getDataRegionGroupIdsSize());
+ createRegionsReq =
+ regionBalancer.genRegionsAllocationPlan(
+ Collections.singletonList(storageGroup),
+ TConsensusGroupType.DataRegion,
+ regionNum);
+ } else {
+ createRegionsReq =
+ regionBalancer.genRegionsAllocationPlan(
+ Collections.singletonList(storageGroup), TConsensusGroupType.SchemaRegion, 1);
+ }
- if (bitSet.cardinality() == regionNum) {
- break;
+ // TODO: use procedure to protect this
+ createRegionsOnDataNodes(createRegionsReq);
+ getConsensusManager().write(createRegionsReq);
+ }
+ } catch (MetadataException e) {
+ LOGGER.warn("Meet error when doing regionExpansion", e);
+ } catch (NotEnoughDataNodeException ignore) {
+ // The LoadManager will expand Regions automatically after there are enough DataNodes.
}
}
-
- if (bitSet.cardinality() < regionNum) {
- LOGGER.error(
- "Failed to create some SchemaRegions or DataRegions on DataNodes. Please check former logs.");
- }
}
- private TCreateSchemaRegionReq genCreateSchemaRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet) {
- TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- return req;
- }
-
- private TCreateDataRegionReq genCreateDataRegionReq(
- String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
- TCreateDataRegionReq req = new TCreateDataRegionReq();
- req.setStorageGroup(storageGroup);
- req.setRegionReplicaSet(regionReplicaSet);
- req.setTtl(TTL);
- return req;
- }
-
- private ConsensusManager getConsensusManager() {
- return configManager.getConsensusManager();
- }
-
- private NodeManager getNodeManager() {
- return configManager.getNodeManager();
- }
-
- private ClusterSchemaManager getClusterSchemaManager() {
- return configManager.getClusterSchemaManager();
- }
-
- private PartitionManager getPartitionManager() {
- return configManager.getPartitionManager();
- }
-
- private THeartbeatReq genHeartbeatReq() {
- return new THeartbeatReq(System.currentTimeMillis());
+ private void doLoadBalancing() {
+ regionExpansion();
+ // TODO: update replicaScoreMap
}
@Override
public void run() {
+ int balanceCount = 0;
while (true) {
try {
if (getConsensusManager().isLeader()) {
// Ask DataNode for heartbeat in every heartbeat interval
- List<TDataNodeLocation> onlineDataNodes = getNodeManager().getOnlineDataNodes();
- for (TDataNodeLocation dataNodeLocation : onlineDataNodes) {
+ List<TDataNodeInfo> onlineDataNodes = getNodeManager().getOnlineDataNodes(-1);
+ for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
HeartbeatHandler handler =
- new HeartbeatHandler(dataNodeLocation.getDataNodeId(), heartbeatCache);
+ new HeartbeatHandler(dataNodeInfo.getLocation().getDataNodeId(), heartbeatCache);
AsyncDataNodeClientPool.getInstance()
- .getHeartBeat(dataNodeLocation.getInternalEndPoint(), genHeartbeatReq(), handler);
+ .getHeartBeat(
+ dataNodeInfo.getLocation().getInternalEndPoint(), genHeartbeatReq(), handler);
}
+ balanceCount += 1;
+ // TODO: Adjust load balancing period
+ if (balanceCount == 10) {
+ doLoadBalancing();
+ balanceCount = 0;
+ }
} else {
heartbeatCache.discardAllCache();
}
@@ -284,4 +209,16 @@ public class LoadManager implements Runnable {
}
}
}
+
+ private ConsensusManager getConsensusManager() {
+ return configManager.getConsensusManager();
+ }
+
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+
+ private ClusterSchemaManager getClusterSchemaManager() {
+ return configManager.getClusterSchemaManager();
+ }
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
index 2e099883d2..cfc23769da 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java
@@ -18,4 +18,96 @@
*/
package org.apache.iotdb.confignode.manager.load.balancer;
-public class RegionBalancer {}
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.confignode.consensus.request.write.CreateRegionsReq;
+import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException;
+import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
+import org.apache.iotdb.confignode.manager.Manager;
+import org.apache.iotdb.confignode.manager.NodeManager;
+import org.apache.iotdb.confignode.manager.PartitionManager;
+import org.apache.iotdb.confignode.manager.load.balancer.allocator.CopySetRegionAllocator;
+import org.apache.iotdb.confignode.manager.load.balancer.allocator.IRegionAllocator;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
+
+import java.util.List;
+
+/**
+ * The RegionBalancer provides interfaces to generate optimal Region allocation and migration plans
+ */
+public class RegionBalancer {
+
+ private final Manager configManager;
+ private final IRegionAllocator regionAllocator;
+
+ public RegionBalancer(Manager configManager) {
+ this.configManager = configManager;
+ // TODO: The RegionAllocator should be configurable
+ this.regionAllocator = new CopySetRegionAllocator();
+ }
+
+ /**
+ * Generate a Regions allocation plan(CreateRegionsReq)
+ *
+ * @param storageGroups List<StorageGroup>
+ * @param consensusGroupType TConsensusGroupType of the new Regions
+ * @param regionNum Number of Regions to be allocated per StorageGroup
+ * @return CreateRegionsReq
+ * @throws NotEnoughDataNodeException When the number of DataNodes is not enough for allocation
+ * @throws MetadataException When some StorageGroups don't exist
+ */
+ public CreateRegionsReq genRegionsAllocationPlan(
+ List<String> storageGroups, TConsensusGroupType consensusGroupType, int regionNum)
+ throws NotEnoughDataNodeException, MetadataException {
+ CreateRegionsReq createRegionsReq = new CreateRegionsReq();
+
+ List<TDataNodeInfo> onlineDataNodes = getNodeManager().getOnlineDataNodes(-1);
+ List<TRegionReplicaSet> allocatedRegions = getPartitionManager().getAllocatedRegions();
+
+ for (String storageGroup : storageGroups) {
+ // Get schema
+ TStorageGroupSchema storageGroupSchema =
+ getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup);
+ int replicationFactor =
+ consensusGroupType == TConsensusGroupType.SchemaRegion
+ ? storageGroupSchema.getSchemaReplicationFactor()
+ : storageGroupSchema.getDataReplicationFactor();
+
+ // Check validity
+ if (onlineDataNodes.size() < replicationFactor) {
+ throw new NotEnoughDataNodeException();
+ }
+
+ for (int i = 0; i < regionNum; i++) {
+ // Generate allocation plan
+ TRegionReplicaSet newRegion =
+ regionAllocator.allocateRegion(
+ onlineDataNodes,
+ allocatedRegions,
+ replicationFactor,
+ new TConsensusGroupId(
+ consensusGroupType, getPartitionManager().generateNextRegionGroupId()));
+ createRegionsReq.addRegion(storageGroup, newRegion);
+
+ allocatedRegions.add(newRegion);
+ }
+ }
+
+ return createRegionsReq;
+ }
+
+ private NodeManager getNodeManager() {
+ return configManager.getNodeManager();
+ }
+
+ private ClusterSchemaManager getClusterSchemaManager() {
+ return configManager.getClusterSchemaManager();
+ }
+
+ private PartitionManager getPartitionManager() {
+ return configManager.getPartitionManager();
+ }
+}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/allocator/CopySetRegionAllocator.java
similarity index 84%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/allocator/CopySetRegionAllocator.java
index 7cdc8adf15..c762b833bd 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/CopySetRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/allocator/CopySetRegionAllocator.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.allocator;
+package org.apache.iotdb.confignode.manager.load.balancer.allocator;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -49,7 +50,7 @@ public class CopySetRegionAllocator implements IRegionAllocator {
@Override
public TRegionReplicaSet allocateRegion(
- List<TDataNodeLocation> onlineDataNodes,
+ List<TDataNodeInfo> onlineDataNodes,
List<TRegionReplicaSet> allocatedRegions,
int replicationFactor,
TConsensusGroupId consensusGroupId) {
@@ -79,12 +80,15 @@ public class CopySetRegionAllocator implements IRegionAllocator {
}
private void buildWeightList(
- List<TDataNodeLocation> onlineDataNodes, List<TRegionReplicaSet> allocatedRegions) {
+ List<TDataNodeInfo> onlineDataNodes, List<TRegionReplicaSet> allocatedRegions) {
+
+ // TODO: The remaining disk capacity of DataNode can also be calculated into the weightList
+
int maximumRegionNum = 0;
Map<TDataNodeLocation, Integer> countMap = new HashMap<>();
- for (TDataNodeLocation dataNodeLocation : onlineDataNodes) {
- maxId = Math.max(maxId, dataNodeLocation.getDataNodeId());
- countMap.put(dataNodeLocation, 0);
+ for (TDataNodeInfo dataNodeInfo : onlineDataNodes) {
+ maxId = Math.max(maxId, dataNodeInfo.getLocation().getDataNodeId());
+ countMap.put(dataNodeInfo.getLocation(), 0);
}
for (TRegionReplicaSet regionReplicaSet : allocatedRegions) {
for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) {
@@ -103,6 +107,7 @@ public class CopySetRegionAllocator implements IRegionAllocator {
}
}
+ /** @return A new CopySet based on weighted random */
private TRegionReplicaSet genWeightedRandomRegion(int replicationFactor) {
Set<Integer> checkSet = new HashSet<>();
TRegionReplicaSet randomRegion = new TRegionReplicaSet();
@@ -124,6 +129,14 @@ public class CopySetRegionAllocator implements IRegionAllocator {
return randomRegion;
}
+ /**
+ * Do intersection check.
+ *
+ * @param allocatedRegions Allocated CopySets.
+ * @param newRegion A new CopySet.
+ * @return True if the intersection size between every allocatedRegions and the newRegion are not
+ * exceed intersectionSize.
+ */
private boolean intersectionCheck(
List<TRegionReplicaSet> allocatedRegions, TRegionReplicaSet newRegion) {
BitSet newBit = new BitSet(maxId + 1);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/allocator/IRegionAllocator.java
similarity index 90%
rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/allocator/IRegionAllocator.java
index 1466fd5d56..a702d9c2e1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/allocator/IRegionAllocator.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/allocator/IRegionAllocator.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.confignode.manager.load.allocator;
+package org.apache.iotdb.confignode.manager.load.balancer.allocator;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import java.util.List;
@@ -37,7 +37,7 @@ public interface IRegionAllocator {
* @return The optimal TRegionReplicaSet derived by the specific algorithm
*/
TRegionReplicaSet allocateRegion(
- List<TDataNodeLocation> onlineDataNodes,
+ List<TDataNodeInfo> onlineDataNodes,
List<TRegionReplicaSet> allocatedRegions,
int replicationFactor,
TConsensusGroupId consensusGroupId);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
index 9bc8e67d59..e9a9001698 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfo.java
@@ -160,18 +160,27 @@ public class ClusterSchemaInfo implements SnapshotProcessor {
storageGroupReadWriteLock.writeLock().lock();
try {
- for (Map.Entry<String, TRegionReplicaSet> reqEntry : req.getRegionMap().entrySet()) {
+ for (Map.Entry<String, List<TRegionReplicaSet>> reqEntry : req.getRegionMap().entrySet()) {
PartialPath partialPathName = new PartialPath(reqEntry.getKey());
TStorageGroupSchema storageGroupSchema =
mTree.getStorageGroupNodeByStorageGroupPath(partialPathName).getStorageGroupSchema();
- switch (reqEntry.getValue().getRegionId().getType()) {
- case SchemaRegion:
- storageGroupSchema.getSchemaRegionGroupIds().add(reqEntry.getValue().getRegionId());
- break;
- case DataRegion:
- storageGroupSchema.getDataRegionGroupIds().add(reqEntry.getValue().getRegionId());
- break;
- }
+ reqEntry
+ .getValue()
+ .forEach(
+ regionReplicaSet -> {
+ switch (regionReplicaSet.getRegionId().getType()) {
+ case SchemaRegion:
+ storageGroupSchema
+ .getSchemaRegionGroupIds()
+ .add(regionReplicaSet.getRegionId());
+ break;
+ case DataRegion:
+ storageGroupSchema
+ .getDataRegionGroupIds()
+ .add(regionReplicaSet.getRegionId());
+ break;
+ }
+ });
}
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (MetadataException e) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
index 343ec9e667..02765058cb 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/NodeInfo.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
@@ -29,7 +30,7 @@ import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeInfoReq;
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
+import org.apache.iotdb.confignode.consensus.response.DataNodeInfosResp;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
@@ -79,18 +80,15 @@ public class NodeInfo implements SnapshotProcessor {
ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor(),
ConfigNodeDescriptor.getInstance().getConf().getDataReplicationFactor());
- private final ReentrantReadWriteLock configNodeInfoReadWriteLock;
-
// Online ConfigNodes
+ private final ReentrantReadWriteLock configNodeInfoReadWriteLock;
private final Set<TConfigNodeLocation> onlineConfigNodes;
+ // Online DataNodes
private final ReentrantReadWriteLock dataNodeInfoReadWriteLock;
-
private AtomicInteger nextDataNodeId = new AtomicInteger(0);
-
- // Online DataNodes
- private final ConcurrentNavigableMap<Integer, TDataNodeLocation> onlineDataNodes =
- new ConcurrentSkipListMap();
+ private final ConcurrentNavigableMap<Integer, TDataNodeInfo> onlineDataNodes =
+ new ConcurrentSkipListMap<>();
// For remove or draining DataNode
// TODO: implement
@@ -105,14 +103,15 @@ public class NodeInfo implements SnapshotProcessor {
new HashSet<>(ConfigNodeDescriptor.getInstance().getConf().getConfigNodeList());
}
- public boolean containsValue(TDataNodeLocation info) {
+ /** @return true if the specific DataNode is now online */
+ public boolean isOnlineDataNode(TDataNodeLocation info) {
boolean result = false;
dataNodeInfoReadWriteLock.readLock().lock();
try {
- for (Map.Entry<Integer, TDataNodeLocation> entry : onlineDataNodes.entrySet()) {
+ for (Map.Entry<Integer, TDataNodeInfo> entry : onlineDataNodes.entrySet()) {
info.setDataNodeId(entry.getKey());
- if (entry.getValue().equals(info)) {
+ if (entry.getValue().getLocation().equals(info)) {
result = true;
break;
}
@@ -124,10 +123,6 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
- public void put(int dataNodeID, TDataNodeLocation info) {
- onlineDataNodes.put(dataNodeID, info);
- }
-
/**
* Persist DataNode info
*
@@ -136,11 +131,12 @@ public class NodeInfo implements SnapshotProcessor {
*/
public TSStatus registerDataNode(RegisterDataNodeReq registerDataNodeReq) {
TSStatus result;
- TDataNodeLocation info = registerDataNodeReq.getLocation();
+ TDataNodeInfo info = registerDataNodeReq.getInfo();
dataNodeInfoReadWriteLock.writeLock().lock();
try {
- onlineDataNodes.put(info.getDataNodeId(), info);
- if (nextDataNodeId.get() < registerDataNodeReq.getLocation().getDataNodeId()) {
+ onlineDataNodes.put(info.getLocation().getDataNodeId(), info);
+
+ if (nextDataNodeId.get() < info.getLocation().getDataNodeId()) {
// In this case, at least one Datanode is registered with the leader node,
// so the nextDataNodeID of the followers needs to be added
nextDataNodeId.getAndIncrement();
@@ -157,7 +153,7 @@ public class NodeInfo implements SnapshotProcessor {
LOGGER.info(
"Successfully register DataNode: {}. Current online DataNodes: {}",
- info,
+ info.getLocation(),
onlineDataNodes);
} finally {
dataNodeInfoReadWriteLock.writeLock().unlock();
@@ -172,18 +168,17 @@ public class NodeInfo implements SnapshotProcessor {
* @return The specific DataNode's info or all DataNode info if dataNodeId in
* QueryDataNodeInfoPlan is -1
*/
- public DataNodeLocationsResp getDataNodeInfo(GetDataNodeInfoReq getDataNodeInfoReq) {
- DataNodeLocationsResp result = new DataNodeLocationsResp();
+ public DataNodeInfosResp getDataNodeInfo(GetDataNodeInfoReq getDataNodeInfoReq) {
+ DataNodeInfosResp result = new DataNodeInfosResp();
result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
int dataNodeId = getDataNodeInfoReq.getDataNodeID();
dataNodeInfoReadWriteLock.readLock().lock();
try {
if (dataNodeId == -1) {
- result.setDataNodeLocations(new HashMap<>(onlineDataNodes));
+ result.setDataNodeInfoMap(new HashMap<>(onlineDataNodes));
} else {
-
- result.setDataNodeLocations(
+ result.setDataNodeInfoMap(
Collections.singletonMap(dataNodeId, onlineDataNodes.get(dataNodeId)));
}
} finally {
@@ -204,22 +199,22 @@ public class NodeInfo implements SnapshotProcessor {
return result;
}
- public List<TDataNodeLocation> getOnlineDataNodes() {
- List<TDataNodeLocation> result;
- dataNodeInfoReadWriteLock.readLock().lock();
- try {
- result = new ArrayList<>(onlineDataNodes.values());
- } finally {
- dataNodeInfoReadWriteLock.readLock().unlock();
- }
- return result;
- }
-
- public TDataNodeLocation getOnlineDataNode(int dataNodeId) {
- TDataNodeLocation result;
+ /**
+ * Return the specific online DataNode
+ *
+ * @param dataNodeId Specific DataNodeId
+ * @return All online DataNodes if dataNodeId equals -1. And return the specific DataNode
+ * otherwise.
+ */
+ public List<TDataNodeInfo> getOnlineDataNodes(int dataNodeId) {
+ List<TDataNodeInfo> result;
dataNodeInfoReadWriteLock.readLock().lock();
try {
- result = onlineDataNodes.get(dataNodeId);
+ if (dataNodeId == -1) {
+ result = new ArrayList<>(onlineDataNodes.values());
+ } else {
+ result = Collections.singletonList(onlineDataNodes.get(dataNodeId));
+ }
} finally {
dataNodeInfoReadWriteLock.readLock().unlock();
}
@@ -316,7 +311,7 @@ public class NodeInfo implements SnapshotProcessor {
private void serializeOnlineDataNode(DataOutputStream outputStream, TProtocol protocol)
throws IOException, TException {
outputStream.writeInt(onlineDataNodes.size());
- for (Entry<Integer, TDataNodeLocation> entry : onlineDataNodes.entrySet()) {
+ for (Entry<Integer, TDataNodeInfo> entry : onlineDataNodes.entrySet()) {
outputStream.writeInt(entry.getKey());
entry.getValue().write(protocol);
}
@@ -368,9 +363,9 @@ public class NodeInfo implements SnapshotProcessor {
int size = inputStream.readInt();
while (size > 0) {
int dataNodeId = inputStream.readInt();
- TDataNodeLocation tDataNodeLocation = new TDataNodeLocation();
- tDataNodeLocation.read(protocol);
- onlineDataNodes.put(dataNodeId, tDataNodeLocation);
+ TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+ dataNodeInfo.read(protocol);
+ onlineDataNodes.put(dataNodeId, dataNodeInfo);
size--;
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
index b9af789868..1f6d4fcdad 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/PartitionInfo.java
@@ -137,10 +137,12 @@ public class PartitionInfo implements SnapshotProcessor {
try {
int maxRegionId = Integer.MIN_VALUE;
- for (TRegionReplicaSet regionReplicaSet : req.getRegionMap().values()) {
- regionReplicaMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
- regionSlotsCounter.put(regionReplicaSet.getRegionId(), 0L);
- maxRegionId = Math.max(maxRegionId, regionReplicaSet.getRegionId().getId());
+ for (List<TRegionReplicaSet> regionReplicaSets : req.getRegionMap().values()) {
+ for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+ regionReplicaMap.put(regionReplicaSet.getRegionId(), regionReplicaSet);
+ regionSlotsCounter.put(regionReplicaSet.getRegionId(), 0L);
+ maxRegionId = Math.max(maxRegionId, regionReplicaSet.getRegionId().getId());
+ }
}
if (nextRegionGroupId.get() < maxRegionId) {
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
index edea9369a6..3566381500 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/DeleteStorageGroupProcedure.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
@@ -193,14 +193,15 @@ public class DeleteStorageGroupProcedure
private void invalidateCache(ConfigNodeProcedureEnv env, String storageGroupName)
throws IOException, TException {
- List<TDataNodeLocation> allDataNodes =
- env.getConfigManager().getNodeManager().getOnlineDataNodes();
+ List<TDataNodeInfo> allDataNodes =
+ env.getConfigManager().getNodeManager().getOnlineDataNodes(-1);
TInvalidateCacheReq invalidateCacheReq = new TInvalidateCacheReq();
invalidateCacheReq.setStorageGroup(true);
invalidateCacheReq.setFullPath(storageGroupName);
- for (TDataNodeLocation dataNodeLocation : allDataNodes) {
- env.getDataNodeClient(dataNodeLocation).invalidateSchemaCache(invalidateCacheReq);
- env.getDataNodeClient(dataNodeLocation).invalidatePartitionCache(invalidateCacheReq);
+ for (TDataNodeInfo dataNodeInfo : allDataNodes) {
+ env.getDataNodeClient(dataNodeInfo.getLocation()).invalidateSchemaCache(invalidateCacheReq);
+ env.getDataNodeClient(dataNodeInfo.getLocation())
+ .invalidatePartitionCache(invalidateCacheReq);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 2748f83e65..c9ec115ad2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -40,7 +40,7 @@ import org.apache.iotdb.confignode.consensus.request.write.SetTTLReq;
import org.apache.iotdb.confignode.consensus.request.write.SetTimePartitionIntervalReq;
import org.apache.iotdb.confignode.consensus.response.CountStorageGroupResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
-import org.apache.iotdb.confignode.consensus.response.DataNodeLocationsResp;
+import org.apache.iotdb.confignode.consensus.response.DataNodeInfosResp;
import org.apache.iotdb.confignode.consensus.response.DataPartitionResp;
import org.apache.iotdb.confignode.consensus.response.PermissionInfoResp;
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
@@ -54,7 +54,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeLocationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -107,7 +107,7 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
@Override
public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) throws TException {
- RegisterDataNodeReq registerReq = new RegisterDataNodeReq(req.getDataNodeLocation());
+ RegisterDataNodeReq registerReq = new RegisterDataNodeReq(req.getDataNodeInfo());
DataNodeConfigurationResp registerResp =
(DataNodeConfigurationResp) configManager.registerDataNode(registerReq);
@@ -121,12 +121,11 @@ public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
}
@Override
- public TDataNodeLocationResp getDataNodeLocations(int dataNodeID) throws TException {
+ public TDataNodeInfoResp getDataNodeInfo(int dataNodeID) throws TException {
GetDataNodeInfoReq queryReq = new GetDataNodeInfoReq(dataNodeID);
- DataNodeLocationsResp queryResp =
- (DataNodeLocationsResp) configManager.getDataNodeInfo(queryReq);
+ DataNodeInfosResp queryResp = (DataNodeInfosResp) configManager.getDataNodeInfo(queryReq);
- TDataNodeLocationResp resp = new TDataNodeLocationResp();
+ TDataNodeInfoResp resp = new TDataNodeInfoResp();
queryResp.convertToRpcDataNodeLocationResp(resp);
return resp;
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
index eaad26db7b..55bad48cf3 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigRequestSerDeTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.consensus.request;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -86,7 +87,13 @@ public class ConfigRequestSerDeTest {
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777));
dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 7777));
- RegisterDataNodeReq req0 = new RegisterDataNodeReq(dataNodeLocation);
+
+ TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+ dataNodeInfo.setLocation(dataNodeLocation);
+ dataNodeInfo.setCpuCoreNum(16);
+ dataNodeInfo.setMaxMemory(34359738368L);
+
+ RegisterDataNodeReq req0 = new RegisterDataNodeReq(dataNodeInfo);
req0.serialize(buffer);
buffer.flip();
RegisterDataNodeReq req1 = (RegisterDataNodeReq) ConfigRequest.Factory.create(buffer);
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
index febddbe2ef..4f37010f1f 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/NodeInfoTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.persistence;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
@@ -61,10 +62,12 @@ public class NodeInfoTest {
@Test
public void testSnapshot() throws TException, IOException {
- RegisterDataNodeReq registerDataNodeReq = new RegisterDataNodeReq(generateTDataNodeLocation(1));
+ RegisterDataNodeReq registerDataNodeReq =
+ new RegisterDataNodeReq(new TDataNodeInfo(generateTDataNodeLocation(1), 16, 34359738368L));
nodeInfo.registerDataNode(registerDataNodeReq);
- registerDataNodeReq = new RegisterDataNodeReq(generateTDataNodeLocation(2));
+ registerDataNodeReq =
+ new RegisterDataNodeReq(new TDataNodeInfo(generateTDataNodeLocation(2), 16, 34359738368L));
nodeInfo.registerDataNode(registerDataNodeReq);
Set<TDataNodeLocation> drainingDataNodes_before = new HashSet<>();
@@ -75,7 +78,7 @@ public class NodeInfoTest {
nodeInfo.setDrainingDataNodes(drainingDataNodes_before);
int nextId = nodeInfo.getNextDataNodeId();
- List<TDataNodeLocation> onlineDataNodes_before = nodeInfo.getOnlineDataNodes();
+ List<TDataNodeInfo> onlineDataNodes_before = nodeInfo.getOnlineDataNodes(-1);
nodeInfo.processTakeSnapshot(snapshotDir);
nodeInfo.clear();
@@ -86,8 +89,7 @@ public class NodeInfoTest {
Set<TDataNodeLocation> drainingDataNodes_after = nodeInfo.getDrainingDataNodes();
Assert.assertEquals(drainingDataNodes_before, drainingDataNodes_after);
- List<TDataNodeLocation> onlineDataNodes_after = nodeInfo.getOnlineDataNodes();
-
+ List<TDataNodeInfo> onlineDataNodes_after = nodeInfo.getOnlineDataNodes(-1);
Assert.assertEquals(onlineDataNodes_before, onlineDataNodes_after);
}
diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
index 2453255d54..278c15b698 100644
--- a/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
+++ b/confignode/src/test/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessorTest.java
@@ -42,7 +42,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeLocationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -100,7 +100,7 @@ public class ConfigNodeRPCServiceProcessorTest {
}
@After
- public void after() throws IOException, InterruptedException {
+ public void after() throws IOException {
processor.close();
FileUtils.deleteFully(new File(ConfigNodeDescriptor.getInstance().getConf().getConsensusDir()));
FileUtils.deleteFully(
@@ -133,10 +133,11 @@ public class ConfigNodeRPCServiceProcessorTest {
dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+ dataNodeInfo.setLocation(dataNodeLocation);
dataNodeInfo.setCpuCoreNum(8);
dataNodeInfo.setMaxMemory(1024 * 1024);
- TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation, dataNodeInfo);
+ TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeInfo);
TDataNodeRegisterResp resp = processor.registerDataNode(req);
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode());
@@ -157,10 +158,11 @@ public class ConfigNodeRPCServiceProcessorTest {
dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+ dataNodeInfo.setLocation(dataNodeLocation);
dataNodeInfo.setCpuCoreNum(8);
dataNodeInfo.setMaxMemory(1024 * 1024);
- TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeLocation, dataNodeInfo);
+ TDataNodeRegisterReq req = new TDataNodeRegisterReq(dataNodeInfo);
TDataNodeRegisterResp resp = processor.registerDataNode(req);
Assert.assertEquals(
TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode(), resp.getStatus().getCode());
@@ -168,35 +170,34 @@ public class ConfigNodeRPCServiceProcessorTest {
checkGlobalConfig(resp.getGlobalConfig());
// test query DataNodeInfo
- TDataNodeLocationResp locationResp = processor.getDataNodeLocations(-1);
+ TDataNodeInfoResp infoResp = processor.getDataNodeInfo(-1);
Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), locationResp.getStatus().getCode());
- Map<Integer, TDataNodeLocation> locationMap = locationResp.getDataNodeLocationMap();
- Assert.assertEquals(3, locationMap.size());
- List<Map.Entry<Integer, TDataNodeLocation>> locationList =
- new ArrayList<>(locationMap.entrySet());
- locationList.sort(Comparator.comparingInt(Map.Entry::getKey));
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), infoResp.getStatus().getCode());
+ Map<Integer, TDataNodeInfo> infoMap = infoResp.getDataNodeInfoMap();
+ Assert.assertEquals(3, infoMap.size());
+ List<Map.Entry<Integer, TDataNodeInfo>> infoList = new ArrayList<>(infoMap.entrySet());
+ infoList.sort(Comparator.comparingInt(Map.Entry::getKey));
for (int i = 0; i < 3; i++) {
dataNodeLocation.setDataNodeId(i);
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
- Assert.assertEquals(dataNodeLocation, locationList.get(i).getValue());
+ Assert.assertEquals(dataNodeLocation, infoList.get(i).getValue().getLocation());
}
- locationResp = processor.getDataNodeLocations(1);
+ infoResp = processor.getDataNodeInfo(1);
Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(), locationResp.getStatus().getCode());
- locationMap = locationResp.getDataNodeLocationMap();
- Assert.assertEquals(1, locationMap.size());
- Assert.assertNotNull(locationMap.get(1));
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(), infoResp.getStatus().getCode());
+ infoMap = infoResp.getDataNodeInfoMap();
+ Assert.assertEquals(1, infoMap.size());
+ Assert.assertNotNull(infoMap.get(1));
dataNodeLocation.setDataNodeId(1);
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6668));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9004));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8778));
dataNodeLocation.setConsensusEndPoint(new TEndPoint("0.0.0.0", 40011));
- Assert.assertEquals(dataNodeLocation, locationMap.get(1));
+ Assert.assertEquals(dataNodeLocation, infoMap.get(1).getLocation());
}
@Test
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
index e641081c06..e689d09c95 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/utils/ThriftCommonsSerDeUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.utils;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -90,6 +91,24 @@ public class ThriftCommonsSerDeUtils {
return dataNodeLocation;
}
+ public static void serializeTDataNodeInfo(TDataNodeInfo dataNodeInfo, ByteBuffer buffer) {
+ try {
+ dataNodeInfo.write(generateWriteProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Write TDataNodeInfo failed: ", e);
+ }
+ }
+
+ public static TDataNodeInfo deserializeTDataNodeInfo(ByteBuffer buffer) {
+ TDataNodeInfo dataNodeInfo = new TDataNodeInfo();
+ try {
+ dataNodeInfo.read(generateReadProtocol(buffer));
+ } catch (TException e) {
+ throw new ThriftSerDeException("Read TDataNodeInfo failed: ", e);
+ }
+ return dataNodeInfo;
+ }
+
public static void serializeTSeriesPartitionSlot(
TSeriesPartitionSlot seriesPartitionSlot, ByteBuffer buffer) {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ecff7dbcd3..a5d34a576f 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
-import org.apache.iotdb.confignode.rpc.thrift.TDataNodeLocationResp;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -186,11 +186,10 @@ public class ConfigNodeClient {
throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
}
- public TDataNodeLocationResp getDataNodeLocations(int dataNodeID)
- throws IoTDBConnectionException {
+ public TDataNodeInfoResp getDataNodeInfos(int dataNodeID) throws IoTDBConnectionException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
- TDataNodeLocationResp resp = client.getDataNodeLocations(dataNodeID);
+ TDataNodeInfoResp resp = client.getDataNodeInfo(dataNodeID);
if (!updateConfigNodeLeader(resp.status)) {
return resp;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index bf77e28b2d..0edbf85f6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -143,7 +143,8 @@ public class DataNode implements DataNodeMBean {
try {
ConfigNodeClient configNodeClient = new ConfigNodeClient();
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- TDataNodeRegisterReq req = new TDataNodeRegisterReq();
+
+ // Set DataNodeLocation
TDataNodeLocation location = new TDataNodeLocation();
location.setDataNodeId(config.getDataNodeId());
location.setExternalEndPoint(new TEndPoint(config.getRpcAddress(), config.getRpcPort()));
@@ -153,13 +154,15 @@ public class DataNode implements DataNodeMBean {
new TEndPoint(config.getInternalIp(), config.getDataBlockManagerPort()));
location.setConsensusEndPoint(
new TEndPoint(config.getInternalIp(), config.getConsensusPort()));
- req.setDataNodeLocation(location);
+ // Set DataNodeInfo
TDataNodeInfo info = new TDataNodeInfo();
+ info.setLocation(location);
info.setCpuCoreNum(Runtime.getRuntime().availableProcessors());
info.setMaxMemory(Runtime.getRuntime().totalMemory());
- req.setDataNodeInfo(info);
+ TDataNodeRegisterReq req = new TDataNodeRegisterReq();
+ req.setDataNodeInfo(info);
TDataNodeRegisterResp dataNodeRegisterResp = configNodeClient.registerDataNode(req);
// store config node lists from resp
diff --git a/thrift-commons/src/main/thrift/common.thrift b/thrift-commons/src/main/thrift/common.thrift
index 5d3b70e9d5..9411682d1f 100644
--- a/thrift-commons/src/main/thrift/common.thrift
+++ b/thrift-commons/src/main/thrift/common.thrift
@@ -83,6 +83,7 @@ struct THeartbeatResp {
}
struct TDataNodeInfo {
- 1: required i32 cpuCoreNum
- 2: required i64 maxMemory
+ 1: required TDataNodeLocation location
+ 2: required i32 cpuCoreNum
+ 3: required i64 maxMemory
}
\ No newline at end of file
diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift
index 29b4a04449..f102e2aa3a 100644
--- a/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/thrift-confignode/src/main/thrift/confignode.thrift
@@ -23,11 +23,10 @@ namespace py iotdb.thrift.confignode
// DataNode
struct TDataNodeRegisterReq {
- 1: required common.TDataNodeLocation dataNodeLocation
- 2: required common.TDataNodeInfo dataNodeInfo
+ 1: required common.TDataNodeInfo dataNodeInfo
// Map<StorageGroupName, TStorageGroupSchema>
// DataNode can use statusMap to report its status to the ConfigNode when restart
- 3: optional map<string, TStorageGroupSchema> statusMap
+ 2: optional map<string, TStorageGroupSchema> statusMap
}
struct TGlobalConfig {
@@ -44,10 +43,10 @@ struct TDataNodeRegisterResp {
4: optional TGlobalConfig globalConfig
}
-struct TDataNodeLocationResp {
+struct TDataNodeInfoResp {
1: required common.TSStatus status
// map<DataNodeId, DataNodeLocation>
- 2: optional map<i32, common.TDataNodeLocation> dataNodeLocationMap
+ 2: optional map<i32, common.TDataNodeInfo> dataNodeInfoMap
}
// StorageGroup
@@ -181,7 +180,7 @@ service ConfigIService {
TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
- TDataNodeLocationResp getDataNodeLocations(i32 dataNodeId)
+ TDataNodeInfoResp getDataNodeInfo(i32 dataNodeId)
/* StorageGroup */