You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2022/11/05 09:09:17 UTC

[iotdb] 01/01: perfect remove process

This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/master1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 399a961b8da27175ac1f4cd024522a3a1ee9f042
Author: Beyyes <cg...@foxmail.com>
AuthorDate: Sat Nov 5 17:08:13 2022 +0800

    perfect remove process
---
 .../persistence/partition/PartitionInfo.java       |  2 +-
 .../partition/StorageGroupPartitionTable.java      |  6 +-
 .../procedure/env/DataNodeRemoveHandler.java       | 67 +++++++++++-----------
 .../impl/node/RemoveDataNodeProcedure.java         |  6 ++
 4 files changed, 42 insertions(+), 39 deletions(-)

diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 812393b0a2..82a461375a 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -470,7 +470,7 @@ public class PartitionInfo implements SnapshotProcessor {
   }
 
   /**
-   * update a region location
+   * Update the location info of given regionId
    *
    * @param req UpdateRegionLocationReq
    * @return TSStatus
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
index 5ffd0881f5..5e98a80c3d 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/StorageGroupPartitionTable.java
@@ -432,7 +432,7 @@ public class StorageGroupPartitionTable {
   private void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
     RegionGroup regionGroup = regionGroupMap.get(regionId);
     if (regionGroup == null) {
-      LOGGER.warn("not find Region Group for region {}", regionId);
+      LOGGER.warn("Cannot find RegionGroup in addRegionNewLocation for region {}", regionId);
       return;
     }
     if (regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
@@ -445,12 +445,12 @@ public class StorageGroupPartitionTable {
   private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) {
     RegionGroup regionGroup = regionGroupMap.get(regionId);
     if (regionGroup == null) {
-      LOGGER.warn("not find Region Group for region {}", regionId);
+      LOGGER.warn("Cannot find RegionGroup in removeRegionOldLocation for region {}", regionId);
       return;
     }
     if (!regionGroup.getReplicaSet().getDataNodeLocations().contains(node)) {
       LOGGER.info(
-          "Node is Not in region locations, no need to remove it, node: {}, region: {}",
+          "Node is not in region locations, no need to remove it, node: {}, region: {}",
           node,
           regionId);
       return;
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index a5f1cc5b2e..8a48624046 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -138,11 +138,11 @@ public class DataNodeRemoveHandler {
    */
   public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
     if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+      LOGGER.warn("Cannot find region replica nodes, region: {}", regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-      status.setMessage("not find region replica nodes, region: " + regionId);
+      status.setMessage("Cannot find region replica nodes, region: " + regionId);
       return null;
     }
 
@@ -167,10 +167,10 @@ public class DataNodeRemoveHandler {
    */
   public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
     TSStatus status;
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
+    List<TDataNodeLocation> regionReplicaNodes = findRegionLocations(regionId);
     if (regionReplicaNodes.isEmpty()) {
       LOGGER.warn(
-          "{}, Not find region replica nodes in createPeer, regionId: {}",
+          "{}, Cannot find region replica nodes in createPeer, regionId: {}",
           REMOVE_DATANODE_PROCESS,
           regionId);
       status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
@@ -354,40 +354,40 @@ public class DataNodeRemoveHandler {
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode) {
     LOGGER.info(
-        "Start to update region {} location from {} to {} when it migrate succeed",
+        "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed",
         regionId,
-        originalDataNode.getInternalEndPoint().getIp(),
-        destDataNode.getInternalEndPoint().getIp());
+        getIdWithRpcEndpoint(originalDataNode),
+        getIdWithRpcEndpoint(destDataNode));
     UpdateRegionLocationPlan req =
         new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
     TSStatus status = configManager.getPartitionManager().updateRegionLocation(req);
     LOGGER.info(
-        "Update region {} location finished, result:{}, old:{}, new:{}",
+        "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}",
         regionId,
         status,
-        originalDataNode.getInternalEndPoint().getIp(),
-        destDataNode.getInternalEndPoint().getIp());
+            getIdWithRpcEndpoint(originalDataNode),
+            getIdWithRpcEndpoint(destDataNode));
+
     // Broadcast the latest RegionRouteMap when Region migration finished
     configManager.getLoadManager().broadcastLatestRegionRouteMap();
   }
 
   /**
-   * Find region replication Nodes
+   * Find all DataNodes which contains the given regionId
    *
    * @param regionId region id
-   * @return data node location
+   * @return DataNode locations
    */
-  public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
-    List<TRegionReplicaSet> regionReplicaSets =
+  public List<TDataNodeLocation> findRegionLocations(TConsensusGroupId regionId) {
+    Optional<TRegionReplicaSet> regionReplicaSet =
         configManager.getPartitionManager().getAllReplicaSets().stream()
             .filter(rg -> rg.regionId.equals(regionId))
-            .collect(Collectors.toList());
-    if (regionReplicaSets.isEmpty()) {
-      LOGGER.warn("not find TRegionReplica for region: {}", regionId);
-      return Collections.emptyList();
+            .findAny();
+    if (regionReplicaSet.isPresent()) {
+      return regionReplicaSet.get().getDataNodeLocations();
     }
 
-    return regionReplicaSets.get(0).getDataNodeLocations();
+    return Collections.emptyList();
   }
 
   private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(
@@ -571,31 +571,28 @@ public class DataNodeRemoveHandler {
    */
   private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica(
       TConsensusGroupId regionId, TDataNodeLocation filterLocation) {
-    List<TDataNodeLocation> regionReplicaNodes = findRegionReplicaNodes(regionId);
-    if (regionReplicaNodes.isEmpty()) {
-      LOGGER.warn("Not find region replica nodes, region: {}", regionId);
+    List<TDataNodeLocation> regionLocations = findRegionLocations(regionId);
+    if (regionLocations.isEmpty()) {
+      LOGGER.warn("Cannot find DataNodes contain the given region: {}", regionId);
       return Optional.empty();
     }
 
+    // Choosing the RUNNING DataNodes to execute firstly
+    // If all DataNodes are not RUNNING, then choose the REMOVING DataNodes secondly
     List<TDataNodeLocation> aliveDataNodes =
         configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream()
             .map(TDataNodeConfiguration::getLocation)
             .collect(Collectors.toList());
 
-    // filter the RUNNING datanode firstly
-    // if all the datanodes are not in RUNNING status, choose the REMOVING datanode
-    // because REMOVING datanode is also alive, it can execute rpc request
-    if (aliveDataNodes.isEmpty()) {
-      aliveDataNodes =
-          configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
-              .map(TDataNodeConfiguration::getLocation)
-              .collect(Collectors.toList());
-    }
+    aliveDataNodes.addAll(
+        configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Removing).stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toList()));
 
     // TODO return the node which has lowest load.
-    for (TDataNodeLocation regionReplicaNode : regionReplicaNodes) {
-      if (aliveDataNodes.contains(regionReplicaNode) && !regionReplicaNode.equals(filterLocation)) {
-        return Optional.of(regionReplicaNode);
+    for (TDataNodeLocation aliveDataNode : aliveDataNodes) {
+      if (regionLocations.contains(aliveDataNode) && !aliveDataNode.equals(filterLocation)) {
+        return Optional.of(aliveDataNode);
       }
     }
 
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index f8c6f7b83c..fe59a56d97 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -196,6 +196,12 @@ public class RemoveDataNodeProcedure extends AbstractNodeProcedure<RemoveDataNod
                 new RegionMigrateProcedure(regionId, disableDataNodeLocation, destDataNode);
             addChildProcedure(regionMigrateProcedure);
             LOG.info("Submit child procedure {} for regionId {}", regionMigrateProcedure, regionId);
+          } else {
+            LOG.error(
+                "{}, Cannot find target DataNode to remove the region: {}",
+                REMOVE_DATANODE_PROCESS,
+                regionId);
+            // TODO terminate all the uncompleted remove datanode process
           }
         });
   }