You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/05 09:09:17 UTC
[iotdb] 01/01: perfect remove process
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/master1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 399a961b8da27175ac1f4cd024522a3a1ee9f042
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sat Nov 5 17:08:13 2022 +0800
perfect remove process
---
.../persistence/partition/PartitionInfo.java | 2 +-
.../partition/StorageGroupPartitionTable.java | 6 +-
.../procedure/env/DataNodeRemoveHandler.java | 67 +++++++++++-----------
.../impl/node/RemoveDataNodeProcedure.java | 6 ++
4 files changed, 42 insertions(+), 39 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 812393b0a2..82a461375a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -470,7 +470,7 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * update a region location
+ * Update the location info of given regionId
*
* @param req UpdateRegionLocationReq
* @return TSStatus
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 5ffd0881f5..5e98a80c3d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -432,7 +432,7 @@ public class StorageGroupPartitionTable {
private void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
RegionGroup regionGroup = regionGroupMap.get(regionId);
if (regionGroup == null) {
- LOGGER.warn("not find Region Group for region {}", regionId);
+ LOGGER.warn("Cannot find RegionGroup in addRegionNewLocation for region {}", regionId);
return;
}
if (regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
@@ -445,12 +445,12 @@ public class StorageGroupPartitionTable {
private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
RegionGroup regionGroup = regionGroupMap.get(regionId);
if (regionGroup == null) {
- LOGGER.warn("not find Region Group for region {}", regionId);
+ LOGGER.warn("Cannot find RegionGroup in removeRegionOldLocation for region {}", regionId);
return;
}
if (!regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
LOGGER.info(
- "Node is Not in region locations, no need to remove it, node: {}, region: {}",
+ "Node is not in region locations, no need to remove it, node: {}, region: {}",
node,
regionId);
return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index a5f1cc5b2e..8a48624046 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -138,11 +138,11 @@ public class DataNodeRemoveHandler {
*/
public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
TSStatus status;
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
if (regionReplicaNodes.isEmpty()) {
- LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ LOGGER.warn("Cannot find region replica nodes, region: {}", regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("not find region replica nodes, region: " + regionId);
+ status.setMessage("Cannot find region replica nodes, region: " + regionId);
return null;
}
@@ -167,10 +167,10 @@ public class DataNodeRemoveHandler {
*/
public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
TSStatus status;
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+ List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
if (regionReplicaNodes.isEmpty()) {
LOGGER.warn(
- "{}, Not find region replica nodes in createPeer, regionId: {}",
+ "{}, Cannot find region replica nodes in createPeer, regionId: {}",
REMOVE_DATANODE_PROCESS,
regionId);
status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -354,40 +354,40 @@ public class DataNodeRemoveHandler {
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode) {
LOGGER.info(
- "Start to update region {} location from {} to {} when it migrate succeed",
+ "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed",
regionId,
- originalDataNode.getInternalEndPoint().getIp(),
- destDataNode.getInternalEndPoint().getIp());
+ getIdWithRpcEndpoint(originalDataNode),
+ getIdWithRpcEndpoint(destDataNode));
UpdateRegionLocationPlan req =
new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
LOGGER.info(
- "Update region {} location finished, result:{}, old:{}, new:{}",
+ "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}",
regionId,
status,
- originalDataNode.getInternalEndPoint().getIp(),
- destDataNode.getInternalEndPoint().getIp());
+ getIdWithRpcEndpoint(originalDataNode),
+ getIdWithRpcEndpoint(destDataNode));
+
// Broadcast the latest RegionRouteMap when Region migration finished
configManager.getLoadManager().broadcastLatestRegionRouteMap();
}
/**
- * Find region replication Nodes
+ * Find all DataNodes which contains the given regionId
*
* @param regionId region id
- * @return data node location
+ * @return DataNode locations
*/
- public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
- List<TRegionReplicaSet> regionReplicaSets =
+ public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) {
+ Optional<TRegionReplicaSet> regionReplicaSet =
configManager.getPartitionManager().getAllReplicaSets().stream()
.filter(rg -> rg.regionId.equals(regionId))
- .collect(Collectors.toList());
- if (regionReplicaSets.isEmpty()) {
- LOGGER.warn("not find TRegionReplica for region: {}", regionId);
- return Collections.emptyList();
+ .findAny();
+ if (regionReplicaSet.isPresent()) {
+ return regionReplicaSet.get().getDataNodeLocations();
}
- return regionReplicaSets.get(0).getDataNodeLocations();
+ return Collections.emptyList();
}
private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
@@ -571,31 +571,28 @@ public class DataNodeRemoveHandler {
*/
private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
- List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
- if (regionReplicaNodes.isEmpty()) {
- LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+ List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
+ if (regionLocations.isEmpty()) {
+ LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId);
return Optional.empty();
}
+ // Choosing the RUNNING DataNodes to execute firstly
+ // If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly
List<TDataNodeLocation> aliveDataNodes =
configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
- // filter the RUNNING datanode firstly
- // if all the datanodes are not in RUNNING status, choose the REMOVING datanode
- // because REMOVING datanode is also alive, it can execute rpc request
- if (aliveDataNodes.isEmpty()) {
- aliveDataNodes =
- configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
- .map(TDataNodeConfiguration::getLocation)
- .collect(Collectors.toList());
- }
+ aliveDataNodes.addAll(
+ configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toList()));
// TODO return the node which has lowest load.
- for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
- if (aliveDataNodes.contains(regionReplicaNode) && !regionReplicaNode.equals(filterLocation)) {
- return Optional.of(regionReplicaNode);
+ for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
+ if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) {
+ return Optional.of(aliveDataNode);
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index f8c6f7b83c..fe59a56d97 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -196,6 +196,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
addChildProcedure(regionMigrateProcedure);
LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+ } else {
+ LOG.error(
+ "{}, Cannot find target DataNode to remove the region: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId);
+ // TODO terminate all the uncompleted remove datanode process
}
});
}